Skip to content

Instantly share code, notes, and snippets.

@drbobbeaty
Created February 16, 2011 22:16
Show Gist options
  • Save drbobbeaty/830383 to your computer and use it in GitHub Desktop.
Save drbobbeaty/830383 to your computer and use it in GitHub Desktop.
This is my lockless Ring FIFO Queue
/**
* RingFIFO.h - this file defines a multi-producer/single-consumer circular
* FIFO queue and is using the compare-and-swap to achieve
* the lock-free goal. The size of this buffer is given as a
* power of 2 - so saying:
* RingFIFO<int, 10> a;
* makes a ring buffer of 2^10 = 1024 ints.
*/
#ifndef __RINGFIFO_H
#define __RINGFIFO_H
// System Headers
#include <stdint.h>
#include <ostream>
#include <string>
// Third-Party Headers
#include <log4cpp/Category.hh>
// Other Headers
// Forward Declarations
// Public Constants
// Public Datatypes
// Public Data Constants
/**
* This is the main class definition
*/
template <class T, size_t aPowOf2Size> class RingFIFO
{
public:
/*******************************************************************
*
* Constructors/Destructor
*
*******************************************************************/
/**
* This is the default constructor that assumes NOTHING - it just
* makes a simple queue ready to hold things.
*/
RingFIFO() :
mData(),
mHead(0),
mTail(0),
mInsPt(0),
mSleepDelay(),
mHowSleepy(0)
{
// clear out the sleep timer values
mSleepDelay.tv_sec = 0;
}
/**
* This is the standard copy constructor that needs to be in every
* class to make sure that we control how many copies we have
* floating around in the system.
*/
RingFIFO( const RingFIFO<T, aPowOf2Size> & anOther ) :
mData(),
mHead(0),
mTail(0),
mInsPt(0),
mSleepDelay(),
mHowSleepy(0)
{
// let the '=' operator do the heavy lifting...
*this = anOther;
}
/**
* This is the standard destructor and needs to be virtual to make
* sure that if we subclass off this, the right destructor will be
* called.
*/
virtual ~RingFIFO()
{
clear();
}
/**
* When we process the result of an equality we need to make sure
* that we do this right by always having an equals operator on
* all classes.
*/
RingFIFO & operator=( RingFIFO<T, aPowOf2Size> & anOther )
{
if (this != & anOther) {
/**
* Assuming the argument is stable, then we're just going
* to walk it, pushing on the values from it in the right
* order so they are appended to us.
*/
for (size_t i = anOther.mHead; i <= anOther.mTail; ++i) {
if (!push(const_cast<const T &>(anOther.mData[i]))) {
break;
}
}
}
return *this;
}
RingFIFO & operator=( const RingFIFO<T, aPowOf2Size> & anOther )
{
return operator=((RingFIFO<T, aPowOf2Size> &)anOther);
}
/*******************************************************************
*
* Accessor Methods
*
*******************************************************************/
/**
* This method takes an item and places it in the queue - if it can.
* If so, then it will return 'true', otherwise, it'll return 'false'.
*/
bool push( T & anElem )
{
bool error = false;
// move the insertion point and get the old value for me
size_t insPt = __sync_fetch_and_add(&mInsPt, 1);
// see if we've crashed the queue all the way around...
if (__sync_bool_compare_and_swap(&mHead, (insPt + 1), mHead)) {
error = true;
cLog.error("[push] the queue is full! Can't push this item");
__sync_fetch_and_sub(&mInsPt, 1);
} else {
// save the data in the right spot for the size
const_cast<T &>(mData[insPt & eBitMask]) = anElem;
// ...and finally update the 'tail' for a new item
__sync_fetch_and_add(&mTail, 1);
}
return !error;
}
bool push( const T & anElem )
{
return push(*const_cast<T *>(&anElem));
}
bool push( volatile T & anElem )
{
return push(*const_cast<T *>(&anElem));
}
/**
* This method updates the passed-in reference with the value on the
* top of the queue - if it can. If so, it'll return the value and
* 'true', but if it can't, as in the queue is empty, then the method
* will return 'false' and the value will be untouched.
*/
bool pop( T & anElem )
{
bool error = false;
// see if we have an empty queue...
if (__sync_bool_compare_and_swap(&mHead, mTail, mHead)) {
error = true;
} else {
// move the head, mask it for the location and copy the value
anElem = const_cast<T &>(mData[__sync_fetch_and_add(&mHead, 1) & eBitMask]);
// don't forget to tell the sleep() that we got a hit
mHowSleepy = 0;
}
return !error;
}
/**
* If there is an item on the queue, this method will return a look
* at that item without updating the queue. The return value will be
* 'true' if there is something, but 'false' if the queue is empty.
*/
bool peek( T & anElem )
{
bool error = false;
// see if we have an empty queue...
if (__sync_bool_compare_and_swap(&mHead, mTail, mHead)) {
error = true;
} else {
anElem = const_cast<T &>(mData[mHead & eBitMask]);
}
return !error;
}
/**
* This method will clear out the contents of the queue so if
* you're storing pointers, then you need to be careful as this
* could leak.
*/
void clear()
{
T v;
while (pop(v));
}
/**
* This method will return 'true' if there are no items in the
* queue. Simple.
*/
bool empty()
{
return (mHead == mTail);
}
/**
* This method will return the total number of items in the
* queue.
*/
size_t size() const
{
size_t retval = mTail - mHead;
// see if it's wrapped around - and correct the unsigned math
if (retval > eCapacity) {
retval = (retval + eCapacity) & eBitMask;
}
return retval;
}
/*******************************************************************
*
* Utility Methods
*
*******************************************************************/
/**
* This method will check to see if the queue is empty, and if it
* is, it'll sleep a "little bit" and then return to the caller.
* The reason for all this is that the queue does not have a blocking
* mode, so the only way to service the queue is to poll it. Too
* agressive and you're wasing CPU cycles, too passive and you're
* wasting time. This tries to strike a nice balance.
*/
void sleep( bool aForcedSleep = false )
{
/**
* If we are empty, and we have a non-zero delay to sleep for,
* then do it, but make sure to cap the sleepy index at the size
* of the array of sleepy values.
*/
if ((aForcedSleep || empty()) &&
((mSleepDelay.tv_nsec = cSleepy[mHowSleepy++]) != 0)) {
nanosleep(&mSleepDelay, NULL);
mHowSleepy = (mHowSleepy > cSleepyMax ? cSleepyMax : mHowSleepy);
}
}
/**
* Because there are plenty of times that it's really useful to have
* a human-readable version of this instance's data, we're going to
* have a common method to give us just that - and this is it.
*/
virtual std::string toString() const
{
return "<RingFIFO>";
}
/**
* This method checks to see if two queues are equal in their
* contents and not their pointer values. This is how you'd likely
* expect equality to work.
*/
bool operator==( RingFIFO<T, aPowOf2Size> & anOther ) const
{
bool equals = true;
// check to see if he's as sleepy as me :)
if (equals) {
equals = (mHowSleepy == anOther.mHowSleepy);
}
// next, check the elements for equality
if (equals) {
for (size_t i = mHead, j = anOther.mHead; i <= mTail; ++i, ++j) {
if (const_cast<T &>(mData[i]) != const_cast<T &>(anOther.mData[j])) {
equals = false;
break;
}
}
}
return equals;
}
/**
* This method checks to see if two queues are NOT equal in their
* contents and not their pointer values. This is how you'd likely
* expect equality to work.
*/
bool operator!=( RingFIFO<T, aPowOf2Size> & anOther ) const
{
return !operator=(anOther);
}
private:
/**
* We need a few 'computed' values from the power of 2 size - the
* first is the real capacity and the second is the bit mask for
* locating the index of the array. These will be used in the code
* to make it a lot faster overall.
*/
enum {
eBitMask = ((1 << aPowOf2Size) - 1),
eCapacity = (1 << aPowOf2Size)
};
/**
* We have a very simple structure - an array of values of a fixed
* size and a simple head and tail.
*/
volatile T mData[eCapacity];
volatile size_t mHead;
volatile size_t mTail;
volatile size_t mInsPt;
/**
* Because this queue will not block until something is there,
* we need to make it a little easier on the users of this class
* by having a simple sleep() method. Simple, but clever under
* the covers. In order to implement this sleeper I need a few
* things, and these are it.
*/
struct timespec mSleepDelay;
uint8_t mHowSleepy;
static int32_t cSleepy[];
static uint8_t cSleepyMax;
/**
* This is the class reference to the logger for this class.
* It's pthread-aware, so we should be OK to use it by all the
* instances of this class.
*/
static log4cpp::Category & cLog;
};
template <class T, size_t aPowOf2Size> int32_t RingFIFO<T, aPowOf2Size>::cSleepy[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 750000, 750000, 750000, 750000, 750000, 750000, 750000, 750000, 750000, 750000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 2500000, 5000000, 5000000, 5000000, 5000000, 5000000, 10000000, 10000000, 10000000, 10000000, 10000000, 50000000, 100000000 };
template <class T, size_t aPowOf2Size> uint8_t RingFIFO<T, aPowOf2Size>::cSleepyMax = 41;
template <class T, size_t aPowOf2Size> log4cpp::Category & RingFIFO<T, aPowOf2Size>::cLog = log4cpp::Category::getInstance("RingFIFO<T, aPowOf2Size>");
#endif // __RINGFIFO_H
// vim: set ts=4:
@abnd90
Copy link

abnd90 commented Aug 7, 2014

In push, you fetch and add insertion point. And then check if inspnt + 1 == head. Say the CAS succeeds, the queue is full. Now, before the insPnt is decremented, this thread is scheduled out.

Another thread that calls push in this state, inspnt == head and CAS fails and you end up overwriting the current head. The increment of the insPnt, the check for the full condition and the decrement of insPnt must all be a single atomic transaction (or) under a mutex?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment