-
-
Save mfrischknecht/a2f00ab8dc6786b0ff2d to your computer and use it in GitHub Desktop.
TaskReadWriteMutex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import vibe.d; | |
import rwmutex; | |
shared static this() | |
{ | |
auto router = new URLRouter(); | |
router.registerWebInterface(new WebInterface()); | |
auto settings = new HTTPServerSettings; | |
settings.port = 8080; | |
//HTTPServerOption.distribute leads to a crash on the latest dmd version | |
//settings.options = HTTPServerOption.parseURL | HTTPServerOption.distribute; | |
settings.bindAddresses = ["::1", "127.0.0.1"]; | |
listenHTTP(settings, router); | |
//This server can be tested in a terminal using wget and gnu parallel: | |
// $ seq 100000 | parallel -n0 wget http://127.0.0.1:8080/ -o /dev/null | |
} | |
auto response(HTTPServerRequest, HTTPServerResponse response) | |
{ | |
return response; | |
} | |
class WebInterface | |
{ | |
private | |
{ | |
TaskReadWriteMutex _mutex; | |
} | |
this() | |
{ | |
_mutex = new TaskReadWriteMutex(); | |
} | |
@before!response("response") | |
void index(HTTPServerResponse response) | |
{ | |
import std.random; | |
import std.stdio; | |
auto i = uniform(0,20); | |
if (i == 1) | |
{ | |
synchronized(_mutex.writer) | |
{ | |
vibe.core.core.sleep(50.msecs); | |
} | |
} | |
else | |
{ | |
synchronized(_mutex.reader) | |
{ | |
vibe.core.core.sleep(1.msecs); | |
} | |
} | |
response.writeBody(""); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
Macros: | |
PHOBOS_DOCS = <a href="http://dlang.org/phobos/$1">$2</a> | |
VIBE_DOCS = <a href="http://vibed.org/api/$1">$2</a> | |
*/ | |
module rwmutex; | |
import core.sync.mutex; | |
import core.atomic; | |
import vibe.d; | |
/** Contains the shared state of a TaskReadWriteMutex. | |
* | |
* Since a TaskReadWriteMutex consists of two actual Mutex | |
* objects that rely on common memory, this class implements | |
* the actual functionality of their method calls. | |
* | |
* The method implementations are based on two static parameters | |
* (INTERRUPTIBLE and INTENT), which are configured through | |
* template arguments: | |
* | |
* - INTERRUPTIBLE determines whether the mutex implementation | |
* are interruptible by vibe.d's vibe.core.task.Task.interrupt() | |
* method or not. Enabling the interruption support will result | |
* in the final mutex implementation not being nothrow. | |
* | |
* - INTENT describes the intent, with which a locking operation is | |
* performed (i.e. READ_ONLY or READ_WRITE). RO locking allows for | |
* multiple Tasks holding the mutex, whereas RW locking will cause | |
* a "bottleneck" so that only one Task can write to the protected | |
* data at once. | |
*/ | |
private struct ReadWriteMutexState(bool INTERRUPTIBLE) | |
{ | |
/** The policy with which the mutex should operate. | |
* | |
* The policy determines how the acquisition of the locks is | |
* performed and can be used to tune the mutex according to the | |
* underlying algorithm in which it is used. | |
* | |
* According to the provided policy, the mutex will either favor | |
* reading or writing tasks and could potentially starve the | |
* respective opposite. | |
* | |
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html#Policy, core.sync.rwmutex.ReadWriteMutex.Policy) | |
*/ | |
enum Policy : int | |
{ | |
/** Readers are prioritized, writers may be starved as a result. */ | |
PREFER_READERS = 0, | |
/** Writers are prioritized, readers may be starved as a result. */ | |
PREFER_WRITERS | |
} | |
/** The intent with which a locking operation is performed. | |
* | |
* Since both locks share the same underlying algorithms, the actual | |
* intent with which a lock operation is performed (i.e read/write) | |
* are passed as a template parameter to each method. | |
*/ | |
enum LockingIntent : bool | |
{ | |
/** Perform a read lock/unlock operation. Multiple reading locks can be | |
* active at a time. */ | |
READ_ONLY = 0, | |
/** Perform a write lock/unlock operation. Only a single writer can | |
* hold a lock at any given time. */ | |
READ_WRITE = 1 | |
} | |
private { | |
//Queue counters | |
/** The number of reading tasks waiting for the lock to become available. */ | |
shared(uint) _waitingForReadLock = 0; | |
/** The number of writing tasks waiting for the lock to become available. */ | |
shared(uint) _waitingForWriteLock = 0; | |
//Lock counters | |
/** The number of reading tasks that currently hold the lock. */ | |
shared(uint) _activeReadLocks = 0; | |
/** The number of writing tasks that currently hold the lock (binary). */ | |
shared(ubyte) _activeWriteLocks = 0; | |
/** The policy determining the lock's behavior. */ | |
Policy _policy; | |
//Queue Events | |
/** The event used to wake reading tasks waiting for the lock while it is blocked. */ | |
ManualEvent _readyForReadLock; | |
/** The event used to wake writing tasks waiting for the lock while it is blocked. */ | |
ManualEvent _readyForWriteLock; | |
/** The underlying TaskMutex that gates the access to the shared state. */ | |
static if (INTERRUPTIBLE) | |
InterruptibleTaskMutex _counterMutex; | |
else | |
TaskMutex _counterMutex; | |
} | |
this(Policy policy) | |
{ | |
_policy = policy; | |
static if (INTERRUPTIBLE) | |
_counterMutex = new InterruptibleTaskMutex(); | |
else | |
_counterMutex = new TaskMutex(); | |
_readyForReadLock = createManualEvent(); | |
_readyForWriteLock = createManualEvent(); | |
} | |
@disable this(this); | |
/** The policy with which the lock has been created. */ | |
@property Policy policy() const { return _policy; } | |
version(RWMutexPrint) | |
{ | |
/** Print out debug information during lock operations. */ | |
void printInfo(string OP, LockingIntent INTENT)() nothrow | |
{ | |
try | |
{ | |
import std.stdio; | |
writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d", | |
OP.leftJustify(10,' '), | |
INTENT == LockingIntent.READ_ONLY ? "RO" : "RW", | |
_activeReadLocks, _activeWriteLocks, | |
_waitingForReadLock, _waitingForWriteLock | |
); | |
} | |
catch (Throwable t){} | |
} | |
} | |
/** An internal shortcut method to determine the queue event for a given intent. */ | |
@property ref auto queueEvent(LockingIntent INTENT)() | |
{ | |
static if (INTENT == LockingIntent.READ_ONLY) | |
return _readyForReadLock; | |
else | |
return _readyForWriteLock; | |
} | |
/** An internal shortcut method to determine the queue counter for a given intent. */ | |
@property ref auto queueCounter(LockingIntent INTENT)() | |
{ | |
static if (INTENT == LockingIntent.READ_ONLY) | |
return _waitingForReadLock; | |
else | |
return _waitingForWriteLock; | |
} | |
/** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */ | |
int emitCount(LockingIntent INTENT)() | |
{ | |
return queueEvent!INTENT.emitCount(); | |
} | |
/** An internal shortcut method to determine the active counter for a given intent. */ | |
@property ref auto activeCounter(LockingIntent INTENT)() | |
{ | |
static if (INTENT == LockingIntent.READ_ONLY) | |
return _activeReadLocks; | |
else | |
return _activeWriteLocks; | |
} | |
/** An internal shortcut method to wait for the queue event for a given intent. | |
* | |
* This method is used during the `lock()` operation, after a | |
* `tryLock()` operation has been unsuccessfully finished. | |
* The active fiber will yield and be suspended until the queue event | |
* for the given intent will be fired. | |
*/ | |
int wait(LockingIntent INTENT)(int count) | |
{ | |
static if (INTERRUPTIBLE) | |
return queueEvent!INTENT.wait(count); | |
else | |
return queueEvent!INTENT.waitUninterruptible(count); | |
} | |
/** An internal shortcut method to notify tasks waiting for the lock to become available again. | |
* | |
* This method is called whenever the number of owners of the mutex hits | |
* zero; this is basically the counterpart to wait(). | |
* It wakes any Task currently waiting for the mutex to be released. | |
*/ | |
@trusted void notify(LockingIntent INTENT)() | |
{ | |
static if (INTENT == LockingIntent.READ_ONLY) | |
{ //If the last reader unlocks the mutex, notify all waiting writers | |
if (atomicLoad(_waitingForWriteLock) > 0) | |
_readyForWriteLock.emit(); | |
} | |
else | |
{ //If a writer unlocks the mutex, notify both readers and writers | |
if (atomicLoad(_waitingForReadLock) > 0) | |
_readyForReadLock.emit(); | |
if (atomicLoad(_waitingForWriteLock) > 0) | |
_readyForWriteLock.emit(); | |
} | |
} | |
/** An internal method that performs the acquisition attempt in different variations. | |
* | |
* Since both locks rely on a common TaskMutex object which gates the access | |
* to their common data acquisition attempts for this lock are more complex | |
* than for simple mutex variants. This method will thus be performing the | |
* `tryLock()` operation in two variations, depending on the callee: | |
* | |
* If called from the outside (WAIT_FOR_BLOCKING_MUTEX = false), the method | |
* will instantly fail if the underlying mutex is locked (i.e. during another | |
* `tryLock()` or `unlock()` operation), in order to guarantee the fastest | |
* possible locking attempt. | |
* | |
* If used internally by the `lock()` method (WAIT_FOR_BLOCKING_MUTEX = true), | |
* the operation will wait for the mutex to be available before deciding if | |
* the lock can be acquired, since the attempt would anyway be repeated until | |
* it succeeds. This will prevent frequent retries under heavy loads and thus | |
* should ensure better performance. | |
*/ | |
@trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)() | |
{ | |
//Log a debug statement for the attempt | |
version(RWMutexPrint) | |
printInfo!("tryLock",INTENT)(); | |
//Try to acquire the lock | |
static if (!WAIT_FOR_BLOCKING_MUTEX) | |
{ | |
if (!_counterMutex.tryLock()) | |
return false; | |
} | |
else | |
_counterMutex.lock(); | |
scope(exit) | |
_counterMutex.unlock(); | |
//Log a debug statement for the attempt | |
version(RWMutexPrint) | |
printInfo!("checkCtrs",INTENT)(); | |
//Check if there's already an active writer | |
if (_activeWriteLocks > 0) | |
return false; | |
//If writers are preferred over readers, check whether there | |
//currently is a writer in the waiting queue and abort if | |
//that's the case. | |
static if (INTENT == LockingIntent.READ_ONLY) | |
if (_policy == Policy.PREFER_WRITERS && _waitingForWriteLock > 0) | |
return false; | |
//If we are locking the mutex for writing, make sure that | |
//there's no reader active. | |
static if (INTENT == LockingIntent.READ_WRITE) | |
if (_activeReadLocks > 0) | |
return false; | |
//We can successfully acquire the lock! | |
//Log a debug statement for the success. | |
version(RWMutexPrint) | |
printInfo!("lock",INTENT)(); | |
//Increase the according counter | |
//(number of active readers/writers) | |
//and return a success code. | |
atomicOp!"+="(activeCounter!INTENT,1); | |
return true; | |
} | |
/** Attempt to acquire the lock for a given intent. | |
* | |
* Returns: | |
* true, if the lock was successfully acquired; | |
* false otherwise. | |
*/ | |
@trusted bool tryLock(LockingIntent INTENT)() | |
{ | |
//Try to lock this mutex without waiting for the underlying | |
//TaskMutex - fail if it is already blocked. | |
return tryLock!(INTENT,false)(); | |
} | |
/** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */ | |
@trusted void lock(LockingIntent INTENT)() | |
{ | |
//Prepare a waiting action before the first | |
//tryLock() call in order to avoid a race | |
//condition that could lead to the queue notification | |
//not being fired. | |
auto count = emitCount!INTENT; | |
atomicOp!"+="(queueCounter!INTENT,1); | |
scope(exit) | |
atomicOp!"-="(queueCounter!INTENT,1); | |
//Try to lock the mutex | |
auto locked = tryLock!(INTENT,true)(); | |
if (locked) | |
return; | |
//Retry until we successfully acquired the lock | |
while(!locked) | |
{ | |
version(RWMutexPrint) | |
printInfo!("wait",INTENT)(); | |
count = wait!INTENT(count); | |
locked = tryLock!(INTENT,true)(); | |
} | |
} | |
/** Unlock the mutex after a successful acquisition. */ | |
@trusted void unlock(LockingIntent INTENT)() | |
{ | |
version(RWMutexPrint) | |
printInfo!("unlock",INTENT)(); | |
debug assert(activeCounter!INTENT > 0); | |
//Decrement the counter of active lock holders. | |
//If the counter hits zero, notify waiting Tasks | |
if (atomicOp!"-="(activeCounter!INTENT,1) == 0) | |
{ | |
version(RWMutexPrint) | |
printInfo!("notify",INTENT)(); | |
notify!INTENT(); | |
} | |
} | |
} | |
/** A ReadWriteMutex implementation for fibers. | |
* | |
* This mutex can be used in exchange for a $(PHOBOS_DOCS core_sync_rwmutex.html, core.sync.mutex.ReadWriteMutex), | |
* but does not block the event loop in contention situations. The `reader` and `writer` | |
* members are used for locking. Locking the `reader` mutex allows access to multiple | |
* readers at once, while the `writer` mutex only allows a single writer to lock it at | |
* any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a | |
* writer is active, no readers can be active at the same time, and vice versa). | |
* | |
* Notice: | |
* Because this class is annotated nothrow, it cannot be interrupted | |
* using vibe.core.task.Task.interrupt(). The corresponding | |
* InterruptException will be deferred until the next blocking | |
* operation yields the event loop. | |
* | |
* Use InterruptibleTaskReadWriteMutex as an alternative that can be | |
* interrupted. | |
* | |
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html, core.sync.mutex.ReadWriteMutex) | |
*/ | |
class TaskReadWriteMutex | |
{ | |
private { | |
alias State = ReadWriteMutexState!false; | |
alias LockingIntent = State.LockingIntent; | |
alias READ_ONLY = LockingIntent.READ_ONLY; | |
alias READ_WRITE = LockingIntent.READ_WRITE; | |
/** The shared state used by the reader and writer mutexes. */ | |
State _state; | |
} | |
/** The policy with which the mutex should operate. | |
* | |
* The policy determines how the acquisition of the locks is | |
* performed and can be used to tune the mutex according to the | |
* underlying algorithm in which it is used. | |
* | |
* According to the provided policy, the mutex will either favor | |
* reading or writing tasks and could potentially starve the | |
* respective opposite. | |
* | |
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html#Policy, core.sync.rwmutex.ReadWriteMutex.Policy) | |
*/ | |
alias Policy = State.Policy; | |
/** A common baseclass for both of the provided mutexes. | |
* | |
* The intent for the according mutex is specified through the | |
* `INTENT` template argument, which determines if a mutex is | |
* used for read or write locking. | |
*/ | |
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable | |
{ | |
/** Try to lock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */ | |
override bool tryLock() nothrow { return _state.tryLock!INTENT(); } | |
/** Lock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */ | |
override void lock() nothrow { _state.lock!INTENT(); } | |
/** Unlock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */ | |
override void unlock() nothrow { _state.unlock!INTENT(); } | |
} | |
alias Reader = Mutex!READ_ONLY; | |
alias Writer = Mutex!READ_WRITE; | |
Reader reader; | |
Writer writer; | |
this(Policy policy = Policy.PREFER_WRITERS) | |
{ | |
_state = State(policy); | |
reader = new Reader(); | |
writer = new Writer(); | |
} | |
/** The policy with which the lock has been created. */ | |
@property Policy policy() const { return _state.policy; } | |
} | |
/** Alternative to TaskReadWriteMutex that supports interruption. | |
* | |
* This class supports the use of vibe.core.task.Task.interrupt() while | |
* waiting in the lock() method. | |
* | |
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html, core.sync.mutex.ReadWriteMutex) | |
*/ | |
class InterruptibleTaskReadWriteMutex | |
{ | |
private { | |
alias State = ReadWriteMutexState!true; | |
alias LockingIntent = State.LockingIntent; | |
alias READ_ONLY = LockingIntent.READ_ONLY; | |
alias READ_WRITE = LockingIntent.READ_WRITE; | |
/** The shared state used by the reader and writer mutexes. */ | |
State _state; | |
} | |
/** The policy with which the mutex should operate. | |
* | |
* The policy determines how the acquisition of the locks is | |
* performed and can be used to tune the mutex according to the | |
* underlying algorithm in which it is used. | |
* | |
* According to the provided policy, the mutex will either favor | |
* reading or writing tasks and could potentially starve the | |
* respective opposite. | |
* | |
* cf. $(PHOBOS_DOCS core_sync_rwmutex.html#Policy, core.sync.rwmutex.ReadWriteMutex.Policy) | |
*/ | |
alias Policy = State.Policy; | |
/** A common baseclass for both of the provided mutexes. | |
* | |
* The intent for the according mutex is specified through the | |
* `INTENT` template argument, which determines if a mutex is | |
* used for read or write locking. | |
* | |
*/ | |
final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable | |
{ | |
/** Try to lock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */ | |
override bool tryLock() { return _state.tryLock!INTENT(); } | |
/** Lock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */ | |
override void lock() { _state.lock!INTENT(); } | |
/** Unlock the mutex. cf. $(PHOBOS_DOCS core_sync_mutex.html, core.sync.mutex.Mutex) */ | |
override void unlock() { _state.unlock!INTENT(); } | |
} | |
alias Reader = Mutex!READ_ONLY; | |
alias Writer = Mutex!READ_WRITE; | |
Reader reader; | |
Writer writer; | |
this(Policy policy = Policy.PREFER_WRITERS) | |
{ | |
_state = State(policy); | |
reader = new Reader(); | |
writer = new Writer(); | |
} | |
/** The policy with which the lock has been created. */ | |
@property Policy policy() const { return _state.policy; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment