Skip to content

Instantly share code, notes, and snippets.

@kumagi
Created November 2, 2015 13:53
Show Gist options
  • Save kumagi/d259274270fdc1385f81 to your computer and use it in GitHub Desktop.
Save kumagi/d259274270fdc1385f81 to your computer and use it in GitHub Desktop.
#define _GNU_SOURCE 1
#include <sched.h> // sched_setaffinity
#include <stdint.h>
#include <stdio.h>
#include <limits.h>
#include <stdlib.h>
#include <stdint.h>
#include <pthread.h>
#include <assert.h>
#include <sys/time.h>
#include <urcu-qsbr.h>
#define mb() asm volatile("" : : : "memory")
#define mf() asm volatile("mfence" : : : "memory")
//#define usleep(n)
#define CACHELINE 64
__thread size_t TID;
/* (setq compile-command "gcc wfstack2.c -O4 -lpthread -lurcu-qsbr -ggdb -Wall -o wfstack -DNDEBUG -flto")
*/
typedef int Item;
typedef struct node {
Item* item;
volatile int winner;
struct node* next;
struct node* prev;
} Node __attribute__ ((aligned (CACHELINE)));;
static Node* new_node(Item* const i, const size_t w, Node* const n) {
Node* const construct = (Node*)malloc(sizeof(Node));
construct->item = i;
construct->winner = w;
construct->next = n;
construct->prev = NULL;
mf();
return construct;
}
typedef struct {
int is_push;
uint64_t phase;
int pending;
Item* item;
} State __attribute__ ((aligned (CACHELINE)));
typedef struct wf_stack {
Node* head;
State** states;
int thread_max;
} wf_stack __attribute__ ((aligned (CACHELINE)));;
static State* new_state(const uint64_t p,
const int pend,
const int push,
Item* const i) {
State* const construct = (State*)malloc(sizeof(State));
construct->phase = p;
construct->pending = pend;
construct->is_push = push;
construct->item = i;
mf();
return construct;
}
void print_state(const State* s) {
printf("{phase:%lu, %s, %s, %d}",
s->phase,
(s->pending == 1) ? "pending" : "none",
(s->is_push == 1) ? "push" : "pop",
(s->item == NULL) ? -1 : *s->item
);
fflush(stdout);
}
void stack_init(wf_stack* const stack, int threads);
void stack_destroy(wf_stack* const stack);
int is_empty(const wf_stack* const stack);
void push(wf_stack* const stack, const Item item);
int pop(wf_stack* const stack, Item* const result);
void dump(const wf_stack* const stack);
static uint64_t max_phase(const wf_stack* const stack);
static int scan(const wf_stack* const stack, uint64_t phase);
static int is_still_pending(const wf_stack* const stack, const int tid, const uint64_t phase);
static void help_push(wf_stack* const stack, const size_t tid, const uint64_t phase);
static void help_pop(wf_stack* const stack, const int tid, const uint64_t phase);
static void help(wf_stack* const stack, const uint64_t phase);
static void help_finish(wf_stack* const stack);
void stack_init(wf_stack* const stack, int threads) {
int i;
stack->head = new_node(NULL, 0, NULL);
stack->states = (State**)malloc(sizeof(State*) * (threads + 1));
stack->states[0] = NULL;
for (i = 1; i <= threads; ++i) {
stack->states[i] = new_state(0, 0, 0, NULL);
}
stack->thread_max = threads;
mf();
}
int is_empty(const wf_stack* const stack) {
Node* got_node = stack->head;
return got_node->item == NULL && got_node->winner == 0 && got_node->next == NULL;
}
void free_stack(State* target) {
if (target->is_push) {
free(target);
} else {
free(target);
}
}
void stack_destroy(wf_stack* const stack) {
int i;
Node* ptr = stack->head;
while (ptr) {
Node* const next = ptr->next;
free(ptr);
ptr = next;
}
for (i = 1; i <= stack->thread_max; ++i) {
free(stack->states[i]);
}
free(stack->states);
}
static uint64_t max_phase(const wf_stack* const stack) {
uint64_t result = 0;
int i;
for (i = 1; i <= stack->thread_max; ++i) {
result = result > stack->states[i]->phase ? result : stack->states[i]->phase;
}
return result;
}
static int is_still_pending(const wf_stack* const stack,
const int tid,
const uint64_t phase) {
return stack->states[tid]->pending &&
stack->states[tid]->phase <= phase;
}
static int scan(const wf_stack* const stack, uint64_t phase) {
int i;
uint64_t min_phase = ~0LLU;
int min_phase_tid = 0;
for (i = 1; i <= stack->thread_max; ++i) {
if (stack->states[i]->phase < min_phase &&
is_still_pending(stack, i, phase)) {
min_phase = stack->states[i]->phase;
min_phase_tid = i;
}
}
return min_phase_tid;
}
static void help(wf_stack* const stack, const uint64_t phase) {
size_t other_tid;
while ((other_tid = scan(stack, phase))) {
if (stack->states[other_tid]->is_push) {
help_push(stack, other_tid, stack->states[other_tid]->phase);
} else {
help_pop(stack, other_tid, stack->states[other_tid]->phase);
}
}
}
void push(wf_stack* const stack, const Item item) {
rcu_quiescent_state();
const uint64_t phase = max_phase(stack) + 1;
State* const old_state = stack->states[TID];
assert(!old_state->pending);
Item* const push_item = (Item*)malloc(sizeof(Item));
mf();
*push_item = item; /* duplicate */
State* push_state = new_state(phase, 1, 1, push_item);
mf();
stack->states[TID] = push_state;
help(stack, phase);
//help_finish(stack);
synchronize_rcu();
free(old_state);
}
static void help_push(wf_stack* const stack,
const size_t tid,
const uint64_t phase) {
Node* new_head = new_node(stack->states[tid]->item, tid, NULL);
while (is_still_pending(stack, tid, phase)) {
rcu_quiescent_state();
Node* const old_head = stack->head;
const int old_winner = old_head->winner;
if (old_head != stack->head) {
continue;
}
if (old_winner == 0) {
/* ニュートラル状態 */
new_head->next = old_head;
if (is_still_pending(stack, tid, phase)) {
if (__sync_bool_compare_and_swap_8(&stack->head,
(uint64_t)old_head,
(uint64_t)new_head)) {
/* この瞬間に他のスレッドからnew_nodeが観測可能になる */
help_finish(stack);
return;
}
}
} else {
help_finish(stack);
}
}
/* 既にpush操作が他のスレッドによって完了したので自分の分は破棄 */
free(new_head);
}
int pop(wf_stack* const stack, Item* const result) {
rcu_quiescent_state();
const uint64_t phase = max_phase(stack) + 1;
State* old_state = stack->states[TID];
assert(!old_state->pending);
State* pop_state = new_state(phase, 1, 0, NULL);
stack->states[TID] = pop_state;
help(stack, phase);
//help_finish(stack);
Item* const got_item = stack->states[TID]->item;
if (got_item != NULL) {
*result = *got_item;
synchronize_rcu();
free(old_state);
free(got_item);
return 1;
} else {
synchronize_rcu();
free(old_state);
return 0;
}
}
void help_pop(wf_stack* const stack,
const int tid,
const uint64_t phase) {
Node* new_head = new_node(NULL, tid, NULL);
while (is_still_pending(stack, tid, phase)) {
rcu_quiescent_state();
Node* const old_head = stack->head;
Node* const next = old_head->next;
int const old_winner = old_head->winner;
State* old_state = stack->states[tid];
Item* old_item = old_state->item;
if (old_head != stack->head) {
continue;
}
if (old_winner == 0) {
if (is_still_pending(stack, tid, phase)) {
new_head->prev = old_head;
if (next) {
new_head->item = next->item;
new_head->next = next->next;
}
//old_state->item = old_head->item;
if (__sync_bool_compare_and_swap_8(&stack->head, (uint64_t)old_head, (uint64_t)new_head)) {
help_finish(stack);
synchronize_rcu();
if (next) {
free(next);
}
free(old_head);
return;
}
}
} else {
help_finish(stack);
}
}
/* 失敗したのでごみ捨て */
free(new_head);
}
void help_finish(wf_stack* const stack) {
Node* const head = stack->head;
const int winner = head->winner;
State* const state = stack->states[winner];
/* headが変わる、ということは既にwinnerが倒されている */
if (head != stack->head) {
return;
}
if (winner == 0) {
return;
}
if (!state->is_push) {
state->item = head->prev->item;
}
mf();
state->pending = 0;
mf();
head->winner = 0;
}
/* ---------------------- */
typedef struct {
size_t tid;
int num;
wf_stack* stack;
pthread_mutex_t* regist_lock;
pthread_barrier_t* barrier;
} workingset;
void* work(void* data) {
workingset* my_ws = (workingset*)data;
wf_stack* stack = my_ws->stack;
TID = my_ws->tid;
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(TID % CORES, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask) == -1) {
perror("setaffinity:");
exit(1);
}
pthread_mutex_lock(my_ws->regist_lock);
rcu_register_thread();
pthread_mutex_unlock(my_ws->regist_lock);
pthread_barrier_wait(my_ws->barrier);
assert(stack->head);
int i;
for (i = 0; i < my_ws->num; ++i) {
//printf("%d ", i);
//fflush(stdout);
push(stack, i);
}
//dump(stack);
//printf("tid %zd push done\n", my_ws->tid);
for (i = 0; i < my_ws->num; ++i) {
int result = i;
int success = pop(stack, &result);
//printf("%d ", result);
//fflush(stdout);
if (!success) {
printf("pop failed\n");
abort();
}
}
rcu_thread_offline();
//printf("tid %d pop done\n", my_ws->tid);
pthread_barrier_wait(my_ws->barrier);
pthread_mutex_lock(my_ws->regist_lock);
rcu_unregister_thread();
pthread_mutex_unlock(my_ws->regist_lock);
return NULL;
}
void dump(const wf_stack* const stack) {
// not thread safe
const Node* ptr = stack->head;
int ret = 0;
//printf("dump:%x\n", head);
while (ptr) {
//ret += ptr->data + ptr->winner;
if (ptr->item == NULL) {
break;
}
printf("{d:%d,w:%d,%p}->", *ptr->item, ptr->winner, ptr->next);
fflush(stdout);
ptr = ptr->next;
}
printf("%d(NULL)\n", ret);
}
double now(){
struct timeval t;
gettimeofday(&t, NULL);
return (double)t.tv_sec + (double)t.tv_usec * 1e-6;
}
int main(int argc, char** argv) {
if (argc != 2) {
printf("specify thread num\n");
return 1;
}
const int thread_max = atoi(argv[1]);
pthread_t thread[thread_max];
workingset wk[thread_max];
const int num = 10000;
wf_stack stack;
stack_init(&stack, thread_max);
pthread_mutex_t regist_lock;
pthread_barrier_t regist_barrier;
// init shared data
pthread_mutex_init(&regist_lock, NULL);
pthread_barrier_init(&regist_barrier, NULL, thread_max + 1);
int i;
for (i = 0; i < thread_max; ++i) {
wk[i].tid = i + 1;
wk[i].num = num;
wk[i].stack = &stack;
wk[i].regist_lock = &regist_lock;
wk[i].barrier = &regist_barrier;
mf();
pthread_create(&thread[i], NULL, work, &wk[i]);
}
usleep(5000);
const double start = now();
pthread_barrier_wait(&regist_barrier);
pthread_barrier_wait(&regist_barrier);
const double finish = now();
for (i = 0; i < thread_max; ++i) {
pthread_join(thread[i], NULL);
}
pthread_barrier_destroy(&regist_barrier);
pthread_mutex_destroy(&regist_lock);
int result;
TID = 1;
int should_fail = pop(&stack, &result);
printf("stack is successfully empty:%d\n", should_fail);
assert(should_fail == 0);
//dump(&stack);
assert(is_empty(&stack));
stack_destroy(&stack);
printf("push: %d pop:%d done.\n", num * thread_max, num * thread_max + 1);
printf("%lf query / sec\n", thread_max * num / (finish - start));
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment