Created
June 14, 2013 02:08
-
-
Save dirocco/5778966 to your computer and use it in GitHub Desktop.
Fixed-size and almost wait-free lockless queue implementation that is simple enough to debug quickly. (Uses OS X atomic/malloc primitives)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Fixed-size, nearly wait-free lockless queue implementation that is simple enough to debug quickly. | |
Similar to the Array-based Queue-lock on p150 of The Art of Multiprocessor Programming but with pointers | |
in the queue instead of flags. | |
Implementation: | |
Before doing anything, both reader and writer claim their index using atomic add/subtract. This | |
allows multiple readers and writers to not clobber eachother. | |
After this, the writer modifies the array. The reader spinlocks on this write, maximum dequeue wait | |
should only be as long as the CAS in enqueue. | |
Requires MESI CPU, tested on Intel i7. | |
*/ | |
#ifndef MULTIPLEX_LOCKLESSQUEUE_H | |
#define MULTIPLEX_LOCKLESSQUEUE_H | |
#include "MultiplexBase.h" | |
#include "PolicyTemplates.h" | |
#include "SystemUtilities.h" | |
#include <libkern/OSAtomic.h> // TODO: Make this OS independant | |
#include <mach/mach.h> | |
namespace mp { | |
static const int quantum_divisor = 4; // fraction of quantum to spin before calling usleep | |
static const int sleep_duration = 1; // usecs to sleep for inside spinlock | |
static void* canary = NULL; // signal byte to indicate when queue was sized too small | |
template<typename T> | |
class LocklessQueue : public NoCopy<LocklessQueue<T> > | |
{ | |
public: | |
LocklessQueue( size_t size, double time_scale ) | |
: size(size), atomic_head(0), atomic_tail(0) | |
{ | |
size_t size_in_bytes = round_page(size * sizeof(T*)); | |
this->size = size_in_bytes / sizeof(T*); | |
vm_allocate(mach_task_self(), (vm_address_t*) &queue, size_in_bytes, true); | |
memset(queue, (long)canary, size_in_bytes); | |
max_spins = (get_cpu_speed() * time_scale) / quantum_divisor; | |
} | |
virtual ~LocklessQueue() | |
{ | |
vm_deallocate(mach_task_self(), (vm_address_t)queue, size); | |
} | |
__inline void enqueue( T* t ) | |
{ | |
uint32_t position = OSAtomicIncrement32(&atomic_tail); | |
position = (position - 1) % size; | |
//if( atomic_head == atomic_tail ) HALT; | |
if( !OSAtomicCompareAndSwapPtr(canary, t, (void**)(&queue[position]))) | |
{ | |
HALT; // You probably made your queue to small, spinlock on queue extension/dequeue here? | |
err_msg("Race condition!\n"); | |
} | |
} | |
__inline T* spinning_dequeue() | |
{ | |
uint64_t counter = 0; | |
T* result = NULL; | |
while ( NULL == (result = dequeue()) ) { | |
if ( counter++ > max_spins ) | |
usleep(sleep_duration); | |
} | |
return result; | |
} | |
__inline T* dequeue() | |
{ | |
if( atomic_head > atomic_tail ) return NULL; | |
uint32_t position = OSAtomicIncrement32(&atomic_head); | |
position = (position - 1) % size; | |
T* result; | |
do { | |
result = queue[position]; // not a barrier read, requires MESI CPU | |
} while( result == canary ); // spinlock should only execute for the duration of enqueue's CAS | |
if(!OSAtomicCompareAndSwapPtr(result, canary, (void**)(&queue[position]))) | |
{ | |
HALT; | |
err_msg("Race condition!\n"); | |
} | |
return result; | |
} | |
private: | |
T** queue; | |
size_t size; | |
int32_t atomic_head; | |
int32_t atomic_tail; | |
uint64_t max_spins; | |
}; | |
}; | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment