Skip to content

Instantly share code, notes, and snippets.

@mfrischknecht
Last active September 26, 2015 13:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mfrischknecht/a2f00ab8dc6786b0ff2d to your computer and use it in GitHub Desktop.
Save mfrischknecht/a2f00ab8dc6786b0ff2d to your computer and use it in GitHub Desktop.
TaskReadWriteMutex
import vibe.d;
import rwmutex;
shared static this()
{
auto router = new URLRouter();
router.registerWebInterface(new WebInterface());
auto settings = new HTTPServerSettings;
settings.port = 8080;
//HTTPServerOption.distribute leads to a crash on the latest dmd version
//settings.options = HTTPServerOption.parseURL | HTTPServerOption.distribute;
settings.bindAddresses = ["::1", "127.0.0.1"];
listenHTTP(settings, router);
//This server can be tested in a terminal using wget and gnu parallel:
// $ seq 100000 | parallel -n0 wget http://127.0.0.1:8080/ -o /dev/null
}
auto response(HTTPServerRequest, HTTPServerResponse response)
{
return response;
}
class WebInterface
{
private
{
TaskReadWriteMutex _mutex;
}
this()
{
_mutex = new TaskReadWriteMutex();
}
@before!response("response")
void index(HTTPServerResponse response)
{
import std.random;
import std.stdio;
auto i = uniform(0,20);
if (i == 1)
{
synchronized(_mutex.writer)
{
vibe.core.core.sleep(50.msecs);
}
}
else
{
synchronized(_mutex.reader)
{
vibe.core.core.sleep(1.msecs);
}
}
response.writeBody("");
}
}
/**
Macros:
PHOBOS_DOCS = <a href="http://dlang.org/phobos/$1">$2</a>
VIBE_DOCS = <a href="http://vibed.org/api/$1">$2</a>
*/
module rwmutex;
import core.sync.mutex;
import core.atomic;
import vibe.d;
/** Contains the shared state of a TaskReadWriteMutex.
*
* Since a TaskReadWriteMutex consists of two actual Mutex
* objects that rely on common memory, this class implements
* the actual functionality of their method calls.
*
* The method implementations are based on two static parameters
* (INTERRUPTIBLE and INTENT), which are configured through
* template arguments:
*
* - INTERRUPTIBLE determines whether the mutex implementation
* are interruptible by vibe.d's vibe.core.task.Task.interrupt()
* method or not. Enabling the interruption support will result
* in the final mutex implementation not being nothrow.
*
* - INTENT describes the intent, with which a locking operation is
* performed (i.e. READ_ONLY or READ_WRITE). RO locking allows for
* multiple Tasks holding the mutex, whereas RW locking will cause
* a "bottleneck" so that only one Task can write to the protected
* data at once.
*/
private struct ReadWriteMutexState(bool INTERRUPTIBLE)
{
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html#Policy, core.sync.rwmutex.ReadWriteMutex.Policy)
*/
enum Policy : int
{
/** Readers are prioritized, writers may be starved as a result. */
PREFER_READERS = 0,
/** Writers are prioritized, readers may be starved as a result. */
PREFER_WRITERS
}
/** The intent with which a locking operation is performed.
*
* Since both locks share the same underlying algorithms, the actual
* intent with which a lock operation is performed (i.e read/write)
* are passed as a template parameter to each method.
*/
enum LockingIntent : bool
{
/** Perform a read lock/unlock operation. Multiple reading locks can be
* active at a time. */
READ_ONLY = 0,
/** Perform a write lock/unlock operation. Only a single writer can
* hold a lock at any given time. */
READ_WRITE = 1
}
private {
//Queue counters
/** The number of reading tasks waiting for the lock to become available. */
shared(uint) _waitingForReadLock = 0;
/** The number of writing tasks waiting for the lock to become available. */
shared(uint) _waitingForWriteLock = 0;
//Lock counters
/** The number of reading tasks that currently hold the lock. */
shared(uint) _activeReadLocks = 0;
/** The number of writing tasks that currently hold the lock (binary). */
shared(ubyte) _activeWriteLocks = 0;
/** The policy determining the lock's behavior. */
Policy _policy;
//Queue Events
/** The event used to wake reading tasks waiting for the lock while it is blocked. */
ManualEvent _readyForReadLock;
/** The event used to wake writing tasks waiting for the lock while it is blocked. */
ManualEvent _readyForWriteLock;
/** The underlying TaskMutex that gates the access to the shared state. */
static if (INTERRUPTIBLE)
InterruptibleTaskMutex _counterMutex;
else
TaskMutex _counterMutex;
}
this(Policy policy)
{
_policy = policy;
static if (INTERRUPTIBLE)
_counterMutex = new InterruptibleTaskMutex();
else
_counterMutex = new TaskMutex();
_readyForReadLock = createManualEvent();
_readyForWriteLock = createManualEvent();
}
@disable this(this);
/** The policy with which the lock has been created. */
@property Policy policy() const { return _policy; }
version(RWMutexPrint)
{
/** Print out debug information during lock operations. */
void printInfo(string OP, LockingIntent INTENT)() nothrow
{
try
{
import std.stdio;
writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d",
OP.leftJustify(10,' '),
INTENT == LockingIntent.READ_ONLY ? "RO" : "RW",
_activeReadLocks, _activeWriteLocks,
_waitingForReadLock, _waitingForWriteLock
);
}
catch (Throwable t){}
}
}
/** An internal shortcut method to determine the queue event for a given intent. */
@property ref auto queueEvent(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return _readyForReadLock;
else
return _readyForWriteLock;
}
/** An internal shortcut method to determine the queue counter for a given intent. */
@property ref auto queueCounter(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return _waitingForReadLock;
else
return _waitingForWriteLock;
}
/** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */
int emitCount(LockingIntent INTENT)()
{
return queueEvent!INTENT.emitCount();
}
/** An internal shortcut method to determine the active counter for a given intent. */
@property ref auto activeCounter(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
return _activeReadLocks;
else
return _activeWriteLocks;
}
/** An internal shortcut method to wait for the queue event for a given intent.
*
* This method is used during the `lock()` operation, after a
* `tryLock()` operation has been unsuccessfully finished.
* The active fiber will yield and be suspended until the queue event
* for the given intent will be fired.
*/
int wait(LockingIntent INTENT)(int count)
{
static if (INTERRUPTIBLE)
return queueEvent!INTENT.wait(count);
else
return queueEvent!INTENT.waitUninterruptible(count);
}
/** An internal shortcut method to notify tasks waiting for the lock to become available again.
*
* This method is called whenever the number of owners of the mutex hits
* zero; this is basically the counterpart to wait().
* It wakes any Task currently waiting for the mutex to be released.
*/
@trusted void notify(LockingIntent INTENT)()
{
static if (INTENT == LockingIntent.READ_ONLY)
{ //If the last reader unlocks the mutex, notify all waiting writers
if (atomicLoad(_waitingForWriteLock) > 0)
_readyForWriteLock.emit();
}
else
{ //If a writer unlocks the mutex, notify both readers and writers
if (atomicLoad(_waitingForReadLock) > 0)
_readyForReadLock.emit();
if (atomicLoad(_waitingForWriteLock) > 0)
_readyForWriteLock.emit();
}
}
/** An internal method that performs the acquisition attempt in different variations.
*
* Since both locks rely on a common TaskMutex object which gates the access
* to their common data acquisition attempts for this lock are more complex
* than for simple mutex variants. This method will thus be performing the
* `tryLock()` operation in two variations, depending on the callee:
*
* If called from the outside (WAIT_FOR_BLOCKING_MUTEX = false), the method
* will instantly fail if the underlying mutex is locked (i.e. during another
* `tryLock()` or `unlock()` operation), in order to guarantee the fastest
* possible locking attempt.
*
* If used internally by the `lock()` method (WAIT_FOR_BLOCKING_MUTEX = true),
* the operation will wait for the mutex to be available before deciding if
* the lock can be acquired, since the attempt would anyway be repeated until
* it succeeds. This will prevent frequent retries under heavy loads and thus
* should ensure better performance.
*/
@trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)()
{
//Log a debug statement for the attempt
version(RWMutexPrint)
printInfo!("tryLock",INTENT)();
//Try to acquire the lock
static if (!WAIT_FOR_BLOCKING_MUTEX)
{
if (!_counterMutex.tryLock())
return false;
}
else
_counterMutex.lock();
scope(exit)
_counterMutex.unlock();
//Log a debug statement for the attempt
version(RWMutexPrint)
printInfo!("checkCtrs",INTENT)();
//Check if there's already an active writer
if (_activeWriteLocks > 0)
return false;
//If writers are preferred over readers, check whether there
//currently is a writer in the waiting queue and abort if
//that's the case.
static if (INTENT == LockingIntent.READ_ONLY)
if (_policy == Policy.PREFER_WRITERS && _waitingForWriteLock > 0)
return false;
//If we are locking the mutex for writing, make sure that
//there's no reader active.
static if (INTENT == LockingIntent.READ_WRITE)
if (_activeReadLocks > 0)
return false;
//We can successfully acquire the lock!
//Log a debug statement for the success.
version(RWMutexPrint)
printInfo!("lock",INTENT)();
//Increase the according counter
//(number of active readers/writers)
//and return a success code.
atomicOp!"+="(activeCounter!INTENT,1);
return true;
}
/** Attempt to acquire the lock for a given intent.
*
* Returns:
* true, if the lock was successfully acquired;
* false otherwise.
*/
@trusted bool tryLock(LockingIntent INTENT)()
{
//Try to lock this mutex without waiting for the underlying
//TaskMutex - fail if it is already blocked.
return tryLock!(INTENT,false)();
}
/** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */
@trusted void lock(LockingIntent INTENT)()
{
//Prepare a waiting action before the first
//tryLock() call in order to avoid a race
//condition that could lead to the queue notification
//not being fired.
auto count = emitCount!INTENT;
atomicOp!"+="(queueCounter!INTENT,1);
scope(exit)
atomicOp!"-="(queueCounter!INTENT,1);
//Try to lock the mutex
auto locked = tryLock!(INTENT,true)();
if (locked)
return;
//Retry until we successfully acquired the lock
while(!locked)
{
version(RWMutexPrint)
printInfo!("wait",INTENT)();
count = wait!INTENT(count);
locked = tryLock!(INTENT,true)();
}
}
/** Unlock the mutex after a successful acquisition. */
@trusted void unlock(LockingIntent INTENT)()
{
version(RWMutexPrint)
printInfo!("unlock",INTENT)();
debug assert(activeCounter!INTENT > 0);
//Decrement the counter of active lock holders.
//If the counter hits zero, notify waiting Tasks
if (atomicOp!"-="(activeCounter!INTENT,1) == 0)
{
version(RWMutexPrint)
printInfo!("notify",INTENT)();
notify!INTENT();
}
}
}
/** A ReadWriteMutex implementation for fibers.
*
* This mutex can be used in exchange for a $(PHOBOS_DOCS core_sync_rwmutex.html, core.sync.mutex.ReadWriteMutex),
* but does not block the event loop in contention situations. The `reader` and `writer`
* members are used for locking. Locking the `reader` mutex allows access to multiple
* readers at once, while the `writer` mutex only allows a single writer to lock it at
* any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a
* writer is active, no readers can be active at the same time, and vice versa).
*
* Notice:
* Because this class is annotated nothrow, it cannot be interrupted
* using vibe.core.task.Task.interrupt(). The corresponding
* InterruptException will be deferred until the next blocking
* operation yields the event loop.
*
* Use InterruptibleTaskReadWriteMutex as an alternative that can be
* interrupted.
*
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html, core.sync.mutex.ReadWriteMutex)
*/
class TaskReadWriteMutex
{
private {
alias State = ReadWriteMutexState!false;
alias LockingIntent = State.LockingIntent;
alias READ_ONLY = LockingIntent.READ_ONLY;
alias READ_WRITE = LockingIntent.READ_WRITE;
/** The shared state used by the reader and writer mutexes. */
State _state;
}
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html#Policy, core.sync.rwmutex.ReadWriteMutex.Policy)
*/
alias Policy = State.Policy;
/** A common baseclass for both of the provided mutexes.
*
* The intent for the according mutex is specified through the
* `INTENT` template argument, which determines if a mutex is
* used for read or write locking.
*/
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
{
/** Try to lock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */
override bool tryLock() nothrow { return _state.tryLock!INTENT(); }
/** Lock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */
override void lock() nothrow { _state.lock!INTENT(); }
/** Unlock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */
override void unlock() nothrow { _state.unlock!INTENT(); }
}
alias Reader = Mutex!READ_ONLY;
alias Writer = Mutex!READ_WRITE;
Reader reader;
Writer writer;
this(Policy policy = Policy.PREFER_WRITERS)
{
_state = State(policy);
reader = new Reader();
writer = new Writer();
}
/** The policy with which the lock has been created. */
@property Policy policy() const { return _state.policy; }
}
/** Alternative to TaskReadWriteMutex that supports interruption.
*
* This class supports the use of vibe.core.task.Task.interrupt() while
* waiting in the lock() method.
*
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html, core.sync.mutex.ReadWriteMutex)
*/
class InterruptibleTaskReadWriteMutex
{
private {
alias State = ReadWriteMutexState!true;
alias LockingIntent = State.LockingIntent;
alias READ_ONLY = LockingIntent.READ_ONLY;
alias READ_WRITE = LockingIntent.READ_WRITE;
/** The shared state used by the reader and writer mutexes. */
State _state;
}
/** The policy with which the mutex should operate.
*
* The policy determines how the acquisition of the locks is
* performed and can be used to tune the mutex according to the
* underlying algorithm in which it is used.
*
* According to the provided policy, the mutex will either favor
* reading or writing tasks and could potentially starve the
* respective opposite.
*
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html#Policy, core.sync.rwmutex.ReadWriteMutex.Policy)
*/
alias Policy = State.Policy;
/** A common baseclass for both of the provided mutexes.
*
* The intent for the according mutex is specified through the
* `INTENT` template argument, which determines if a mutex is
* used for read or write locking.
*
*/
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
{
/** Try to lock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */
override bool tryLock() { return _state.tryLock!INTENT(); }
/** Lock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */
override void lock() { _state.lock!INTENT(); }
/** Unlock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */
override void unlock() { _state.unlock!INTENT(); }
}
alias Reader = Mutex!READ_ONLY;
alias Writer = Mutex!READ_WRITE;
Reader reader;
Writer writer;
this(Policy policy = Policy.PREFER_WRITERS)
{
_state = State(policy);
reader = new Reader();
writer = new Writer();
}
/** The policy with which the lock has been created. */
@property Policy policy() const { return _state.policy; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment