Skip to content

Instantly share code, notes, and snippets.

@dirocco
Created June 14, 2013 02:08
Show Gist options
  • Save dirocco/5778966 to your computer and use it in GitHub Desktop.
Save dirocco/5778966 to your computer and use it in GitHub Desktop.
Fixed-size and almost wait-free lockless queue implementation that is simple enough to debug quickly. (Uses OS X atomic/malloc primitives)
/*
Fixed-size, nearly wait-free lockless queue implementation that is simple enough to debug quickly.
Similar to the Array-based Queue-lock on p150 of The Art of Multiprocessor Programming but with pointers
in the queue instead of flags.
Implementation:
Before doing anything, both reader and writer claim their index using atomic add/subtract. This
allows multiple readers and writers to not clobber eachother.
After this, the writer modifies the array. The reader spinlocks on this write, maximum dequeue wait
should only be as long as the CAS in enqueue.
Requires MESI CPU, tested on Intel i7.
*/
#ifndef MULTIPLEX_LOCKLESSQUEUE_H
#define MULTIPLEX_LOCKLESSQUEUE_H
#include "MultiplexBase.h"
#include "PolicyTemplates.h"
#include "SystemUtilities.h"
#include <libkern/OSAtomic.h> // TODO: Make this OS independant
#include <mach/mach.h>
namespace mp {
static const int quantum_divisor = 4; // fraction of quantum to spin before calling usleep
static const int sleep_duration = 1; // usecs to sleep for inside spinlock
static void* canary = NULL; // signal byte to indicate when queue was sized too small
template<typename T>
class LocklessQueue : public NoCopy<LocklessQueue<T> >
{
public:
LocklessQueue( size_t size, double time_scale )
: size(size), atomic_head(0), atomic_tail(0)
{
size_t size_in_bytes = round_page(size * sizeof(T*));
this->size = size_in_bytes / sizeof(T*);
vm_allocate(mach_task_self(), (vm_address_t*) &queue, size_in_bytes, true);
memset(queue, (long)canary, size_in_bytes);
max_spins = (get_cpu_speed() * time_scale) / quantum_divisor;
}
virtual ~LocklessQueue()
{
vm_deallocate(mach_task_self(), (vm_address_t)queue, size);
}
__inline void enqueue( T* t )
{
uint32_t position = OSAtomicIncrement32(&atomic_tail);
position = (position - 1) % size;
//if( atomic_head == atomic_tail ) HALT;
if( !OSAtomicCompareAndSwapPtr(canary, t, (void**)(&queue[position])))
{
HALT; // You probably made your queue to small, spinlock on queue extension/dequeue here?
err_msg("Race condition!\n");
}
}
__inline T* spinning_dequeue()
{
uint64_t counter = 0;
T* result = NULL;
while ( NULL == (result = dequeue()) ) {
if ( counter++ > max_spins )
usleep(sleep_duration);
}
return result;
}
__inline T* dequeue()
{
if( atomic_head > atomic_tail ) return NULL;
uint32_t position = OSAtomicIncrement32(&atomic_head);
position = (position - 1) % size;
T* result;
do {
result = queue[position]; // not a barrier read, requires MESI CPU
} while( result == canary ); // spinlock should only execute for the duration of enqueue's CAS
if(!OSAtomicCompareAndSwapPtr(result, canary, (void**)(&queue[position])))
{
HALT;
err_msg("Race condition!\n");
}
return result;
}
private:
T** queue;
size_t size;
int32_t atomic_head;
int32_t atomic_tail;
uint64_t max_spins;
};
};
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment