Skip to content

Instantly share code, notes, and snippets.

@drbobbeaty
Created February 17, 2011 19:57
Show Gist options
  • Save drbobbeaty/832520 to your computer and use it in GitHub Desktop.
Save drbobbeaty/832520 to your computer and use it in GitHub Desktop.
Improved MP/SC lockless Ring FIFO Queue
/**
* RingFIFO.h - this file defines a multi-producer/single-consumer circular
* FIFO queue and is using the compare-and-swap to achieve
* the lock-free goal. The size of this buffer is given as a
* power of 2 - so saying:
* RingFIFO<int, 10> a;
* makes a ring buffer of 2^10 = 1024 ints.
*/
#ifndef __RINGFIFO_H
#define __RINGFIFO_H
// System Headers
#include <stdint.h>
#include <ostream>
#include <string>
// Third-Party Headers
#include <log4cpp/Category.hh>
// Other Headers
// Forward Declarations
// Public Constants
// Public Datatypes
// Public Data Constants
/**
* This is the main class definition
*/
template <class T, size_t aPowOf2Size> class RingFIFO
{
public:
/*******************************************************************
*
* Constructors/Destructor
*
*******************************************************************/
/**
* This is the default constructor that assumes NOTHING - it just
* makes a simple queue ready to hold things.
*/
RingFIFO() :
mData(),
mHead(0),
mTail(0),
mSleepDelay(),
mHowSleepy(0)
{
// clear out the sleep timer values
mSleepDelay.tv_sec = 0;
}
/**
* This is the standard copy constructor that needs to be in every
* class to make sure that we control how many copies we have
* floating around in the system.
*/
RingFIFO( const RingFIFO<T, aPowOf2Size> & anOther ) :
mData(),
mHead(0),
mTail(0),
mSleepDelay(),
mHowSleepy(0)
{
// let the '=' operator do the heavy lifting...
*this = anOther;
}
/**
* This is the standard destructor and needs to be virtual to make
* sure that if we subclass off this, the right destructor will be
* called.
*/
virtual ~RingFIFO()
{
clear();
}
/**
* When we process the result of an equality we need to make sure
* that we do this right by always having an equals operator on
* all classes.
*/
RingFIFO & operator=( RingFIFO<T, aPowOf2Size> & anOther )
{
if (this != & anOther) {
/**
* Assuming the argument is stable, then we're just going
* to walk it, pushing on the values from it in the right
* order so they are appended to us.
*/
for (size_t i = anOther.mHead; i <= anOther.mTail; ++i) {
if (!push(const_cast<const T &>(anOther.mData[i].value))) {
break;
}
}
}
return *this;
}
RingFIFO & operator=( const RingFIFO<T, aPowOf2Size> & anOther )
{
return operator=((RingFIFO<T, aPowOf2Size> &)anOther);
}
/*******************************************************************
*
* Accessor Methods
*
*******************************************************************/
/**
* This method takes an item and places it in the queue - if it can.
* If so, then it will return 'true', otherwise, it'll return 'false'.
*/
bool push( T & anElem )
{
bool error = false;
// move the tail (insertion point) and get the old value for me
size_t insPt = __sync_fetch_and_add(&mTail, 1) & eBitMask;
// see if we've crashed the queue all the way around...
if (mData[insPt].valid) {
error = true;
cLog.error("[push] the queue is full! We're overwriting the oldest data with the new!");
}
// save the data in the right spot and flag it as valid
const_cast<T &>(mData[insPt].value) = anElem;
mData[insPt].valid = true;
return !error;
}
bool push( const T & anElem )
{
return push(*const_cast<T *>(&anElem));
}
bool push( volatile T & anElem )
{
return push(*const_cast<T *>(&anElem));
}
/**
* This method updates the passed-in reference with the value on the
* top of the queue - if it can. If so, it'll return the value and
* 'true', but if it can't, as in the queue is empty, then the method
* will return 'false' and the value will be untouched.
*/
bool pop( T & anElem )
{
bool error = false;
// see if we have an empty queue...
if (!mData[mHead & eBitMask].valid) {
error = true;
} else {
// move the head, mask it for the location and copy the value
size_t grab = __sync_fetch_and_add(&mHead, 1) & eBitMask;
anElem = const_cast<T &>(mData[grab].value);
mData[grab].valid = false;
// don't forget to tell the sleep() that we got a hit
mHowSleepy = 0;
}
return !error;
}
/**
* If there is an item on the queue, this method will return a look
* at that item without updating the queue. The return value will be
* 'true' if there is something, but 'false' if the queue is empty.
*/
bool peek( T & anElem )
{
bool error = false;
// see if we have an empty queue...
if (!mData[mHead & eBitMask].valid) {
error = true;
} else {
anElem = const_cast<T &>(mData[mHead & eBitMask].value);
}
return !error;
}
/**
* This method will clear out the contents of the queue so if
* you're storing pointers, then you need to be careful as this
* could leak.
*/
void clear()
{
T v;
while (pop(v));
}
/**
* This method will return 'true' if there are no items in the
* queue. Simple.
*/
bool empty()
{
return (mHead == mTail);
}
/**
* This method will return the total number of items in the
* queue.
*/
size_t size() const
{
size_t retval = mTail - mHead;
// see if it's wrapped around - and correct the unsigned math
if (retval > eCapacity) {
retval = (retval + eCapacity) & eBitMask;
}
return retval;
}
/*******************************************************************
*
* Utility Methods
*
*******************************************************************/
/**
* This method will check to see if the queue is empty, and if it
* is, it'll sleep a "little bit" and then return to the caller.
* The reason for all this is that the queue does not have a blocking
* mode, so the only way to service the queue is to poll it. Too
* agressive and you're wasing CPU cycles, too passive and you're
* wasting time. This tries to strike a nice balance.
*/
void sleep( bool aForcedSleep = false )
{
/**
* If we are empty, and we have a non-zero delay to sleep for,
* then do it, but make sure to cap the sleepy index at the size
* of the array of sleepy values.
*/
if ((aForcedSleep || empty()) &&
((mSleepDelay.tv_nsec = cSleepy[mHowSleepy++]) != 0)) {
nanosleep(&mSleepDelay, NULL);
mHowSleepy = (mHowSleepy > cSleepyMax ? cSleepyMax : mHowSleepy);
}
}
/**
* Because there are plenty of times that it's really useful to have
* a human-readable version of this instance's data, we're going to
* have a common method to give us just that - and this is it.
*/
virtual std::string toString() const
{
return "<RingFIFO>";
}
/**
* This method checks to see if two queues are equal in their
* contents and not their pointer values. This is how you'd likely
* expect equality to work.
*/
bool operator==( RingFIFO<T, aPowOf2Size> & anOther ) const
{
bool equals = true;
// check to see if he's as sleepy as me :)
if (equals) {
equals = (mHowSleepy == anOther.mHowSleepy);
}
// next, check the elements for equality
if (equals) {
for (size_t i = mHead, j = anOther.mHead; i <= mTail; ++i, ++j) {
if (const_cast<T &>(mData[i]) != const_cast<T &>(anOther.mData[j])) {
equals = false;
break;
}
}
}
return equals;
}
/**
* This method checks to see if two queues are NOT equal in their
* contents and not their pointer values. This is how you'd likely
* expect equality to work.
*/
bool operator!=( RingFIFO<T, aPowOf2Size> & anOther ) const
{
return !operator=(anOther);
}
private:
/**
* We need a few 'computed' values from the power of 2 size - the
* first is the real capacity and the second is the bit mask for
* locating the index of the array. These will be used in the code
* to make it a lot faster overall.
*/
enum {
eBitMask = ((1 << aPowOf2Size) - 1),
eCapacity = (1 << aPowOf2Size)
};
/**
* In order to simplify the ring buffer access, I'm going to actually
* have the ring a series of 'nodes', and for each, there will be a
* value and a valid 'flag'. If the flag isn't true, then the value
* in the node isn't valid. We'll use this to decouple the push()
* and pop() so that each only needs to know it's own location and
* then interrogate the node for state.
*/
struct Node {
T value;
bool valid;
Node() : value(), valid(false) { }
Node( const T & aValue ) : value(aValue), valid(true) { }
~Node() { }
};
/**
* We have a very simple structure - an array of values of a fixed
* size and a simple head and tail.
*/
volatile Node mData[eCapacity];
volatile size_t mHead;
volatile size_t mTail;
/**
* Because this queue will not block until something is there,
* we need to make it a little easier on the users of this class
* by having a simple sleep() method. Simple, but clever under
* the covers. In order to implement this sleeper I need a few
* things, and these are it.
*/
struct timespec mSleepDelay;
uint8_t mHowSleepy;
static int32_t cSleepy[];
static uint8_t cSleepyMax;
/**
* This is the class reference to the logger for this class.
* It's pthread-aware, so we should be OK to use it by all the
* instances of this class.
*/
static log4cpp::Category & cLog;
};
template <class T, size_t aPowOf2Size> int32_t RingFIFO<T, aPowOf2Size>::cSleepy[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 750000, 750000, 750000, 750000, 750000, 750000, 750000, 750000, 750000, 750000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 5000000, 5000000, 5000000, 5000000, 5000000, 10000000, 10000000, 10000000, 10000000, 10000000, 50000000, 100000000 };
template <class T, size_t aPowOf2Size> uint8_t RingFIFO<T, aPowOf2Size>::cSleepyMax = 41;
template <class T, size_t aPowOf2Size> log4cpp::Category & RingFIFO<T, aPowOf2Size>::cLog = log4cpp::Category::getInstance("RingFIFO<T, aPowOf2Size>");
#endif // __RINGFIFO_H
// vim: set ts=4:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment