Skip to content

Instantly share code, notes, and snippets.

@atomd-zz
Created September 18, 2012 14:39
Show Gist options
  • Save atomd-zz/3743486 to your computer and use it in GitHub Desktop.
Save atomd-zz/3743486 to your computer and use it in GitHub Desktop.
Lock-based & Lock-free RingBuffers
#ifndef THE2SIN18_QUEUE_H
#define THE2SIN18_QUEUE_H
#ifndef CACHELINE_SIZE
#define CACHELINE_SIZE 64
#endif
#ifndef NON_ALIGNED
#define ALIGNED __attribute__((aligned(CACHELINE_SIZE)))
#endif
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <pthread.h>
#include <sched.h>
#include <stdint.h>
#include <unistd.h>
namespace the2sin18 {
struct mutex_guard
{
explicit mutex_guard(pthread_mutex_t* mutex_ptr): mutex_ptr_(mutex_ptr)
{
pthread_mutex_lock(mutex_ptr_);
}
~mutex_guard()
{
pthread_mutex_unlock(mutex_ptr_);
}
private:
pthread_mutex_t* mutex_ptr_;
mutex_guard(const mutex_guard&);
mutex_guard& operator=(const mutex_guard&);
};
template<typename T, unsigned CAPACITY = 32 * 1024 * 1024, unsigned POP_MAX_RETRIES = 12>
struct mutex_queue
{
typedef T value_type;
explicit mutex_queue():
pop_index_(0),
push_index_(0),
data_(static_cast<value_type*>(malloc(CAPACITY * sizeof(value_type))))
{
pthread_mutex_init(&mutex_, NULL);
}
~mutex_queue()
{
pthread_mutex_destroy(&mutex_);
free(data_);
}
bool try_pop_front(value_type* value_ptr)
{
mutex_guard lock(&mutex_);
if (pop_index_ == push_index_) {
return false;
}
*value_ptr = data_[pop_index_++ % CAPACITY];
return true;
}
bool try_push_back(const value_type new_value)
{
mutex_guard lock(&mutex_);
if (pop_index_ == push_index_ + 1) {
return false;
}
data_[push_index_++ % CAPACITY] = new_value;
return true;
}
bool pop_front(value_type* value_ptr)
{
unsigned counter = 0;
while (!try_pop_front(value_ptr) && counter++ < POP_MAX_RETRIES) {
sched_yield();
}
return counter < POP_MAX_RETRIES;
}
void push_back(const value_type new_value)
{
while (!try_push_back(new_value)) {
sched_yield();
}
}
int size() const
{
return (push_index_ - pop_index_) % CAPACITY;
}
private:
ALIGNED unsigned pop_index_;
ALIGNED unsigned push_index_;
ALIGNED value_type* const data_;
ALIGNED pthread_mutex_t mutex_;
mutex_queue(const mutex_queue&);
mutex_queue& operator=(const mutex_queue&);
};
template<typename T, unsigned CAPACITY = 32 * 1024 * 1024, unsigned POP_MAX_RETRIES = 12>
struct cas_queue
{
typedef T value_type;
explicit cas_queue():
pop_cnt_(0),
push_cnt_(1),
data_(static_cast<value_type*>(calloc(CAPACITY, sizeof(value_type))))
{
data_[0] = POP_TAG;
data_[1] = PUSH_TAG;
}
~cas_queue()
{
free(data_);
}
bool try_pop_front(value_type* value_ptr)
{
register unsigned pop_cnt;
register unsigned push_cnt;
register unsigned pop_index;
register unsigned push_index;
register unsigned pop_next_index;
register value_type* data = data_;
register value_type* pop_ptr;
register value_type* pop_next_ptr;
register value_type pop_value;
while (true) {
pop_cnt = pop_cnt_;
push_cnt = push_cnt_;
pop_index = pop_cnt % CAPACITY;
push_index = push_cnt % CAPACITY;
pop_next_index = (pop_cnt + 1) % CAPACITY;
if (pop_next_index == push_index) {
return false;
}
pop_ptr = data + pop_index;
pop_next_ptr = data + pop_next_index;
pop_value = *pop_next_ptr;
if (__sync_bool_compare_and_swap(pop_ptr, POP_TAG, EMPTY_TAG) &&
__sync_bool_compare_and_swap(pop_next_ptr, pop_value, POP_TAG)
) {
*value_ptr = pop_value - RESERVED;
break;
}
}
__sync_fetch_and_add(&pop_cnt_, 1);
return true;
}
bool try_push_back(const value_type new_value)
{
register unsigned pop_cnt;
register unsigned push_cnt;
register unsigned pop_index;
register unsigned push_index;
register unsigned push_next_index;
register value_type* data = data_;
register value_type* push_ptr;
register value_type* push_next_ptr;
while (true) {
pop_cnt = pop_cnt_;
push_cnt = push_cnt_;
pop_index = pop_cnt % CAPACITY;
push_index = push_cnt % CAPACITY;
push_next_index = (push_cnt + 1) % CAPACITY;
if (pop_index == push_next_index) {
return false;
}
push_ptr = data + push_index;
push_next_ptr = data + push_next_index;
if (__sync_bool_compare_and_swap(push_ptr, PUSH_TAG, new_value + RESERVED) &&
__sync_bool_compare_and_swap(push_next_ptr, EMPTY_TAG, PUSH_TAG)) {
break;
}
}
__sync_fetch_and_add(&push_cnt_, 1);
return true;
}
bool pop_front(value_type* value_ptr)
{
unsigned counter = 0;
while (!try_pop_front(value_ptr) && counter++ < POP_MAX_RETRIES) {
sched_yield();
}
return counter < POP_MAX_RETRIES;
}
void push_back(const value_type new_value)
{
while (!try_push_back(new_value)) {
sched_yield();
}
}
int size() const
{
return (push_cnt_ - pop_cnt_ - 1) % CAPACITY;
}
private:
static const value_type EMPTY_TAG = 0;
static const value_type POP_TAG = 1;
static const value_type PUSH_TAG = 2;
static const value_type RESERVED = 3;
ALIGNED unsigned pop_cnt_;
ALIGNED unsigned push_cnt_;
ALIGNED value_type* const data_;
cas_queue(const cas_queue&);
cas_queue& operator=(const cas_queue&);
};
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment