Created
September 18, 2012 14:39
-
-
Save atomd-zz/3743486 to your computer and use it in GitHub Desktop.
Lock-based & Lock-free RingBuffers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef THE2SIN18_QUEUE_H | |
#define THE2SIN18_QUEUE_H | |
#ifndef CACHELINE_SIZE | |
#define CACHELINE_SIZE 64 | |
#endif | |
#ifndef NON_ALIGNED | |
#define ALIGNED __attribute__((aligned(CACHELINE_SIZE))) | |
#endif | |
#include <cstdio> | |
#include <cstdlib> | |
#include <cstring> | |
#include <pthread.h> | |
#include <sched.h> | |
#include <stdint.h> | |
#include <unistd.h> | |
namespace the2sin18 { | |
struct mutex_guard | |
{ | |
explicit mutex_guard(pthread_mutex_t* mutex_ptr): mutex_ptr_(mutex_ptr) | |
{ | |
pthread_mutex_lock(mutex_ptr_); | |
} | |
~mutex_guard() | |
{ | |
pthread_mutex_unlock(mutex_ptr_); | |
} | |
private: | |
pthread_mutex_t* mutex_ptr_; | |
mutex_guard(const mutex_guard&); | |
mutex_guard& operator=(const mutex_guard&); | |
}; | |
template<typename T, unsigned CAPACITY = 32 * 1024 * 1024, unsigned POP_MAX_RETRIES = 12> | |
struct mutex_queue | |
{ | |
typedef T value_type; | |
explicit mutex_queue(): | |
pop_index_(0), | |
push_index_(0), | |
data_(static_cast<value_type*>(malloc(CAPACITY * sizeof(value_type)))) | |
{ | |
pthread_mutex_init(&mutex_, NULL); | |
} | |
~mutex_queue() | |
{ | |
pthread_mutex_destroy(&mutex_); | |
free(data_); | |
} | |
bool try_pop_front(value_type* value_ptr) | |
{ | |
mutex_guard lock(&mutex_); | |
if (pop_index_ == push_index_) { | |
return false; | |
} | |
*value_ptr = data_[pop_index_++ % CAPACITY]; | |
return true; | |
} | |
bool try_push_back(const value_type new_value) | |
{ | |
mutex_guard lock(&mutex_); | |
if (pop_index_ == push_index_ + 1) { | |
return false; | |
} | |
data_[push_index_++ % CAPACITY] = new_value; | |
return true; | |
} | |
bool pop_front(value_type* value_ptr) | |
{ | |
unsigned counter = 0; | |
while (!try_pop_front(value_ptr) && counter++ < POP_MAX_RETRIES) { | |
sched_yield(); | |
} | |
return counter < POP_MAX_RETRIES; | |
} | |
void push_back(const value_type new_value) | |
{ | |
while (!try_push_back(new_value)) { | |
sched_yield(); | |
} | |
} | |
int size() const | |
{ | |
return (push_index_ - pop_index_) % CAPACITY; | |
} | |
private: | |
ALIGNED unsigned pop_index_; | |
ALIGNED unsigned push_index_; | |
ALIGNED value_type* const data_; | |
ALIGNED pthread_mutex_t mutex_; | |
mutex_queue(const mutex_queue&); | |
mutex_queue& operator=(const mutex_queue&); | |
}; | |
template<typename T, unsigned CAPACITY = 32 * 1024 * 1024, unsigned POP_MAX_RETRIES = 12> | |
struct cas_queue | |
{ | |
typedef T value_type; | |
explicit cas_queue(): | |
pop_cnt_(0), | |
push_cnt_(1), | |
data_(static_cast<value_type*>(calloc(CAPACITY, sizeof(value_type)))) | |
{ | |
data_[0] = POP_TAG; | |
data_[1] = PUSH_TAG; | |
} | |
~cas_queue() | |
{ | |
free(data_); | |
} | |
bool try_pop_front(value_type* value_ptr) | |
{ | |
register unsigned pop_cnt; | |
register unsigned push_cnt; | |
register unsigned pop_index; | |
register unsigned push_index; | |
register unsigned pop_next_index; | |
register value_type* data = data_; | |
register value_type* pop_ptr; | |
register value_type* pop_next_ptr; | |
register value_type pop_value; | |
while (true) { | |
pop_cnt = pop_cnt_; | |
push_cnt = push_cnt_; | |
pop_index = pop_cnt % CAPACITY; | |
push_index = push_cnt % CAPACITY; | |
pop_next_index = (pop_cnt + 1) % CAPACITY; | |
if (pop_next_index == push_index) { | |
return false; | |
} | |
pop_ptr = data + pop_index; | |
pop_next_ptr = data + pop_next_index; | |
pop_value = *pop_next_ptr; | |
if (__sync_bool_compare_and_swap(pop_ptr, POP_TAG, EMPTY_TAG) && | |
__sync_bool_compare_and_swap(pop_next_ptr, pop_value, POP_TAG) | |
) { | |
*value_ptr = pop_value - RESERVED; | |
break; | |
} | |
} | |
__sync_fetch_and_add(&pop_cnt_, 1); | |
return true; | |
} | |
bool try_push_back(const value_type new_value) | |
{ | |
register unsigned pop_cnt; | |
register unsigned push_cnt; | |
register unsigned pop_index; | |
register unsigned push_index; | |
register unsigned push_next_index; | |
register value_type* data = data_; | |
register value_type* push_ptr; | |
register value_type* push_next_ptr; | |
while (true) { | |
pop_cnt = pop_cnt_; | |
push_cnt = push_cnt_; | |
pop_index = pop_cnt % CAPACITY; | |
push_index = push_cnt % CAPACITY; | |
push_next_index = (push_cnt + 1) % CAPACITY; | |
if (pop_index == push_next_index) { | |
return false; | |
} | |
push_ptr = data + push_index; | |
push_next_ptr = data + push_next_index; | |
if (__sync_bool_compare_and_swap(push_ptr, PUSH_TAG, new_value + RESERVED) && | |
__sync_bool_compare_and_swap(push_next_ptr, EMPTY_TAG, PUSH_TAG)) { | |
break; | |
} | |
} | |
__sync_fetch_and_add(&push_cnt_, 1); | |
return true; | |
} | |
bool pop_front(value_type* value_ptr) | |
{ | |
unsigned counter = 0; | |
while (!try_pop_front(value_ptr) && counter++ < POP_MAX_RETRIES) { | |
sched_yield(); | |
} | |
return counter < POP_MAX_RETRIES; | |
} | |
void push_back(const value_type new_value) | |
{ | |
while (!try_push_back(new_value)) { | |
sched_yield(); | |
} | |
} | |
int size() const | |
{ | |
return (push_cnt_ - pop_cnt_ - 1) % CAPACITY; | |
} | |
private: | |
static const value_type EMPTY_TAG = 0; | |
static const value_type POP_TAG = 1; | |
static const value_type PUSH_TAG = 2; | |
static const value_type RESERVED = 3; | |
ALIGNED unsigned pop_cnt_; | |
ALIGNED unsigned push_cnt_; | |
ALIGNED value_type* const data_; | |
cas_queue(const cas_queue&); | |
cas_queue& operator=(const cas_queue&); | |
}; | |
} | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment