Skip to content

Instantly share code, notes, and snippets.

@kumagi
Created November 2, 2015 13:33
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save kumagi/b9a4715b1ce0dd511922 to your computer and use it in GitHub Desktop.
Save kumagi/b9a4715b1ce0dd511922 to your computer and use it in GitHub Desktop.
#define _GNU_SOURCE 1
#include <sched.h> // sched_setaffinity
#include <stdint.h>
#include <stdio.h>
#include <limits.h>
#include <stdlib.h>
#include <stdint.h>
#include <pthread.h>
#include <assert.h>
#include <sys/time.h>
#include <urcu-qsbr.h>
#define mb() asm volatile("" : : : "memory")
#define mf() asm volatile("mfence" : : : "memory")
//#define usleep(n)
#define CACHELINE 64
__thread size_t TID;
typedef int Item;
typedef struct StackNode_ {
Item item;
struct StackNode_* next;
} Node __attribute__((aligned (CACHELINE)));
typedef struct {
Node* head;
} lf_stack __attribute__ ((aligned (CACHELINE)));;
void stack_init(lf_stack* stack) {
stack->head = NULL;
}
int is_empty(lf_stack* stack) {
return stack->head == NULL;
}
void stack_destroy(lf_stack* stack) {
Node* ptr = stack->head;
while (ptr) {
Node* next = ptr->next;
free(ptr);
ptr = next;
}
}
void push(lf_stack* stack, Item new_item)
{
/* ノードを初期化 */
Node* const new_node = (Node*)malloc(sizeof(Node));
new_node->item = new_item;
/* Stackの先端を更新 */
for (;;) {
rcu_quiescent_state();
Node* old_head = stack->head;
new_node->next = old_head;
int result = __sync_bool_compare_and_swap_8(&stack->head,
(uint64_t)old_head,
(uint64_t)new_node);
if (result) {
break;
}
}
}
int pop(lf_stack* stack, Item* result)
{
/* スタックの先端のノードのItemを取得 */
Node* old_head;
for (;;) {
rcu_quiescent_state();
old_head = stack->head;
if (old_head == NULL) { return 0; }
Node* next = old_head->next;
int result = __sync_bool_compare_and_swap_8(&stack->head,
(uint64_t)old_head,
(uint64_t)next);
if (result) {
break;
}
}
*result = old_head->item;
synchronize_rcu();
free(old_head);
return 1;
}
/* ---------------------- */
typedef struct {
size_t tid;
int num;
lf_stack* stack;
pthread_mutex_t* regist_lock;
pthread_barrier_t* barrier;
} workingset;
void* work(void* data) {
workingset* my_ws = (workingset*)data;
lf_stack* stack = my_ws->stack;
TID = my_ws->tid;
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(TID % CORES, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask) == -1) {
perror("setaffinity:");
exit(1);
}
pthread_mutex_lock(my_ws->regist_lock);
rcu_register_thread();
pthread_mutex_unlock(my_ws->regist_lock);
pthread_barrier_wait(my_ws->barrier);
int i;
for (i = 0; i < my_ws->num; ++i) {
//printf("%d ", i);
//fflush(stdout);
push(stack, i);
}
//dump(stack);
//printf("tid %zd push done\n", my_ws->tid);
for (i = 0; i < my_ws->num; ++i) {
int result = i;
int success = pop(stack, &result);
//printf("%d ", result);
//fflush(stdout);
if (!success) {
printf("pop failed\n");
abort();
}
}
rcu_thread_offline();
//printf("tid %d pop done\n", my_ws->tid);
pthread_barrier_wait(my_ws->barrier);
pthread_mutex_lock(my_ws->regist_lock);
rcu_unregister_thread();
pthread_mutex_unlock(my_ws->regist_lock);
return NULL;
}
void dump(const lf_stack* const stack) {
// not thread safe
const Node* ptr = stack->head;
int ret = 0;
//printf("dump:%x\n", head);
while (ptr) {
//ret += ptr->data + ptr->winner;
printf("{d:%d,%p}->", ptr->item, ptr->next);
fflush(stdout);
ptr = ptr->next;
}
printf("%d(NULL)\n", ret);
}
double now(){
struct timeval t;
gettimeofday(&t, NULL);
return (double)t.tv_sec + (double)t.tv_usec * 1e-6;
}
int main(int argc, char** argv) {
if (argc != 2) {
printf("specify thread num\n");
return 1;
}
const int thread_max = atoi(argv[1]);
pthread_t thread[thread_max];
workingset wk[thread_max];
const int num = 10000;
lf_stack stack;
stack_init(&stack);
pthread_mutex_t regist_lock;
pthread_barrier_t regist_barrier;
// init shared data
pthread_mutex_init(&regist_lock, NULL);
pthread_barrier_init(&regist_barrier, NULL, thread_max + 1);
int i;
for (i = 0; i < thread_max; ++i) {
wk[i].tid = i + 1;
wk[i].num = num;
wk[i].stack = &stack;
wk[i].regist_lock = &regist_lock;
wk[i].barrier = &regist_barrier;
mf();
pthread_create(&thread[i], NULL, work, &wk[i]);
}
usleep(5000);
const double start = now();
pthread_barrier_wait(&regist_barrier);
pthread_barrier_wait(&regist_barrier);
const double finish = now();
for (i = 0; i < thread_max; ++i) {
pthread_join(thread[i], NULL);
}
pthread_barrier_destroy(&regist_barrier);
pthread_mutex_destroy(&regist_lock);
int result;
TID = 1;
int should_fail = pop(&stack, &result);
printf("stack is successfully empty:%d\n", should_fail);
assert(should_fail == 0);
//dump(&stack);
assert(is_empty(&stack));
stack_destroy(&stack);
printf("push: %d pop:%d done.\n", num * thread_max, num * thread_max + 1);
printf("%lf query / sec\n", thread_max * num / (finish - start));
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment