Created
June 29, 2011 08:10
-
-
Save mopemope/1053395 to your computer and use it in GitHub Desktop.
lock-free queue and thread pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef COMMON_H | |
#define COMMON_H | |
//#ifdef DEVELOP | |
#define DEBUG(...) \ | |
do { \ | |
/*printf("%-22s%4u: ", __FILE__, __LINE__);*/ \ | |
/*printf("%-22s %-32s%4u: ", __FILE__, __func__, __LINE__);*/ \ | |
printf("%s%4u: ", __FILE__, __LINE__); \ | |
printf(__VA_ARGS__); \ | |
printf("\n"); \ | |
} while(0) | |
//#else | |
//#define DEBUG(...) do{}while(0) | |
//#endif | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "common.h" | |
#include "tq.h" | |
#define MB() __asm__ __volatile__("": : :"memory") | |
static inline void* tq_dequeue(thread_queue_t *tq); | |
static inline int | |
cas(pointer_t * addr, pointer_t oldp, pointer_t newp) | |
{ | |
char result; | |
__asm__ __volatile__( | |
"lock; cmpxchg8b %0; setz %1" | |
:"=m"(*addr),"=q"(result) | |
:"m"(*addr), "a"(oldp.count), "d"(oldp.ptr), "b"(newp.count), "c"(newp.ptr) | |
:"memory"); | |
return (int)result; | |
} | |
static inline node_t* | |
create_node(void *v) | |
{ | |
node_t *n; | |
n = (node_t *)malloc(sizeof(node_t)); | |
if(n == NULL){ | |
return NULL; | |
} | |
n->value = v; | |
n->next.ptr = NULL; | |
n->next.count = 0; | |
return n; | |
} | |
static inline void | |
free_node(node_t *node) | |
{ | |
free(node); | |
} | |
static inline queue_t* | |
init_queue(void) | |
{ | |
queue_t *q; | |
node_t *n; | |
q = malloc(sizeof(queue_t)); | |
if(q == NULL){ | |
return NULL; | |
} | |
n = create_node(NULL); | |
if(n == NULL){ | |
free(q); | |
return NULL; | |
} | |
q->head.ptr = q->tail.ptr = n; | |
return q; | |
} | |
static inline void | |
free_queue(queue_t *q) | |
{ | |
node_t *n; | |
n = q->head.ptr; | |
while((n = n->next.ptr) != NULL) { | |
free_node(n); | |
} | |
free(q); | |
} | |
static inline int | |
enqueue(queue_t *q, void *v){ | |
node_t *n; | |
pointer_t tail, next, tmp; | |
n = create_node(v); | |
if(n == NULL){ | |
return -1; | |
} | |
while(1){ | |
tail = q->tail; | |
next = tail.ptr->next; | |
MB(); | |
if(tail.count == q->tail.count && tail.ptr == q->tail.ptr) { | |
if(next.ptr == NULL){ | |
tmp.ptr = n; | |
tmp.count = next.count + 1; | |
MB(); | |
if(cas(&tail.ptr->next, next, tmp)){ | |
break; | |
} | |
}else{ | |
tmp.ptr = next.ptr; | |
tmp.count = tail.count + 1; | |
MB(); | |
cas(&q->tail, tail, tmp); | |
} | |
} | |
} | |
tmp.ptr = n; | |
tmp.count = tail.count + 1; | |
MB(); | |
cas(&q->tail, tail, tmp); | |
return 1; | |
} | |
static inline void* | |
dequeue(queue_t * q) | |
{ | |
pointer_t head, tail, next, tmp; | |
void *v = NULL; | |
while (1) { | |
head = q->head; | |
tail = q->tail; | |
next = head.ptr->next; | |
MB(); | |
if(head.count == q->head.count && head.ptr == q->head.ptr) { | |
if(head.ptr == tail.ptr){ | |
if(next.ptr == NULL) { | |
return NULL; | |
} | |
tmp.ptr = next.ptr; | |
tmp.count = head.count + 1; | |
MB(); | |
cas(&q->tail, tail, tmp); | |
}else{ | |
v = next.ptr->value; | |
tmp.ptr = next.ptr; | |
tmp.count = head.count + 1; | |
MB(); | |
if(cas(&q->head, head, tmp)) { | |
break; | |
} | |
} | |
} | |
} | |
free_node(head.ptr); | |
return v; | |
} | |
static void | |
process(void *data) | |
{ | |
//DEBUG("process thread:%p %d", (void *)pthread_self(), (int)data); | |
DEBUG("process %d", (int)data); | |
/* | |
struct timespec tc; | |
tc.tv_sec = 0; | |
tc.tv_nsec = rand(); | |
nanosleep(&tc, NULL); | |
*/ | |
} | |
pthread_mutex_t ready_mutex = PTHREAD_MUTEX_INITIALIZER; | |
pthread_cond_t ready_cond = PTHREAD_COND_INITIALIZER; | |
static inline void * | |
thread_function(void *args) | |
{ | |
thread_queue_t *tq; | |
void *data; | |
tq = (thread_queue_t *)args; | |
pthread_detach(pthread_self()); | |
pthread_cond_signal(&ready_cond); | |
while(1){ | |
DEBUG("wait thread:%p", (void *)pthread_self()); | |
pthread_mutex_lock(&tq->mutex); | |
while((data = tq_dequeue(tq)) == NULL){ | |
pthread_cond_wait(&tq->cond, &tq->mutex); | |
} | |
pthread_mutex_unlock(&tq->mutex); | |
DEBUG("resume thread:%p", (void *)pthread_self()); | |
//process | |
tq->worker(data); | |
while((data = tq_dequeue(tq)) != NULL){ | |
tq->worker(data); | |
} | |
} | |
return NULL; | |
} | |
static inline thread_queue_t* | |
init_tq(int size, runnable *worker) | |
{ | |
int i = 0; | |
thread_queue_t *tq; | |
tq = malloc(sizeof(thread_queue_t)); | |
tq->q = init_queue(); | |
pthread_mutex_init(&tq->mutex, NULL); | |
pthread_cond_init(&tq->cond, NULL); | |
tq->worker = worker; | |
tq->threads = malloc(sizeof(pthread_t) * size); | |
pthread_mutex_lock(&ready_mutex); | |
for(i = 0; i < size; i++){ | |
if(!pthread_create(&tq->threads[i], NULL, thread_function, tq)){ | |
pthread_cond_wait(&ready_cond, &ready_mutex); | |
}else{ | |
perror("error"); | |
} | |
} | |
pthread_mutex_unlock(&ready_mutex); | |
DEBUG("init tq %p", tq); | |
return tq; | |
} | |
static inline int | |
tq_enqueue(thread_queue_t *tq, void *data) | |
{ | |
int ret; | |
ret = enqueue(tq->q, data); | |
if(ret > 0){ | |
pthread_cond_broadcast(&tq->cond); | |
} | |
return ret; | |
} | |
static inline void* | |
tq_dequeue(thread_queue_t *tq) | |
{ | |
return dequeue(tq->q); | |
} | |
int | |
main(void) | |
{ | |
thread_queue_t *tq; | |
int i = 0; | |
tq = init_tq(128, process); | |
for(i = 0; i < 100000; i++){ | |
tq_enqueue(tq, (void*)i); | |
} | |
sleep(3); | |
for(i = 0; i < 100000; i++){ | |
tq_enqueue(tq, (void*)i); | |
} | |
sleep(10); | |
DEBUG("fin"); | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef T_QUEUE_H | |
#define T_QUEUE_H | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <string.h> | |
#include <inttypes.h> | |
#include <unistd.h> | |
#include <time.h> | |
#include <pthread.h> | |
typedef struct { | |
unsigned int count; | |
struct _node_t *ptr; | |
} __attribute__ ((packed)) pointer_t; | |
typedef struct _node_t { | |
void *value; | |
pointer_t next; | |
} __attribute__ ((packed)) node_t; | |
typedef struct { | |
pointer_t head; | |
pointer_t tail; | |
} __attribute__ ((packed)) queue_t; | |
typedef struct { | |
queue_t *q; | |
pthread_mutex_t mutex; | |
pthread_cond_t cond; | |
pthread_t *threads; | |
} __attribute__ ((packed)) thread_queue_t; | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment