Skip to content

Instantly share code, notes, and snippets.

@mopemope
Created June 29, 2011 08:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mopemope/1053395 to your computer and use it in GitHub Desktop.
Save mopemope/1053395 to your computer and use it in GitHub Desktop.
lock-free queue and thread pool
#ifndef COMMON_H
#define COMMON_H
//#ifdef DEVELOP
#define DEBUG(...) \
do { \
/*printf("%-22s%4u: ", __FILE__, __LINE__);*/ \
/*printf("%-22s %-32s%4u: ", __FILE__, __func__, __LINE__);*/ \
printf("%s%4u: ", __FILE__, __LINE__); \
printf(__VA_ARGS__); \
printf("\n"); \
} while(0)
//#else
//#define DEBUG(...) do{}while(0)
//#endif
#endif
#include "common.h"
#include "tq.h"
#define MB() __asm__ __volatile__("": : :"memory")
static inline void* tq_dequeue(thread_queue_t *tq);
static inline int
cas(pointer_t * addr, pointer_t oldp, pointer_t newp)
{
char result;
__asm__ __volatile__(
"lock; cmpxchg8b %0; setz %1"
:"=m"(*addr),"=q"(result)
:"m"(*addr), "a"(oldp.count), "d"(oldp.ptr), "b"(newp.count), "c"(newp.ptr)
:"memory");
return (int)result;
}
static inline node_t*
create_node(void *v)
{
node_t *n;
n = (node_t *)malloc(sizeof(node_t));
if(n == NULL){
return NULL;
}
n->value = v;
n->next.ptr = NULL;
n->next.count = 0;
return n;
}
static inline void
free_node(node_t *node)
{
free(node);
}
static inline queue_t*
init_queue(void)
{
queue_t *q;
node_t *n;
q = malloc(sizeof(queue_t));
if(q == NULL){
return NULL;
}
n = create_node(NULL);
if(n == NULL){
free(q);
return NULL;
}
q->head.ptr = q->tail.ptr = n;
return q;
}
static inline void
free_queue(queue_t *q)
{
node_t *n;
n = q->head.ptr;
while((n = n->next.ptr) != NULL) {
free_node(n);
}
free(q);
}
static inline int
enqueue(queue_t *q, void *v){
node_t *n;
pointer_t tail, next, tmp;
n = create_node(v);
if(n == NULL){
return -1;
}
while(1){
tail = q->tail;
next = tail.ptr->next;
MB();
if(tail.count == q->tail.count && tail.ptr == q->tail.ptr) {
if(next.ptr == NULL){
tmp.ptr = n;
tmp.count = next.count + 1;
MB();
if(cas(&tail.ptr->next, next, tmp)){
break;
}
}else{
tmp.ptr = next.ptr;
tmp.count = tail.count + 1;
MB();
cas(&q->tail, tail, tmp);
}
}
}
tmp.ptr = n;
tmp.count = tail.count + 1;
MB();
cas(&q->tail, tail, tmp);
return 1;
}
static inline void*
dequeue(queue_t * q)
{
pointer_t head, tail, next, tmp;
void *v = NULL;
while (1) {
head = q->head;
tail = q->tail;
next = head.ptr->next;
MB();
if(head.count == q->head.count && head.ptr == q->head.ptr) {
if(head.ptr == tail.ptr){
if(next.ptr == NULL) {
return NULL;
}
tmp.ptr = next.ptr;
tmp.count = head.count + 1;
MB();
cas(&q->tail, tail, tmp);
}else{
v = next.ptr->value;
tmp.ptr = next.ptr;
tmp.count = head.count + 1;
MB();
if(cas(&q->head, head, tmp)) {
break;
}
}
}
}
free_node(head.ptr);
return v;
}
static void
process(void *data)
{
//DEBUG("process thread:%p %d", (void *)pthread_self(), (int)data);
DEBUG("process %d", (int)data);
/*
struct timespec tc;
tc.tv_sec = 0;
tc.tv_nsec = rand();
nanosleep(&tc, NULL);
*/
}
pthread_mutex_t ready_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ready_cond = PTHREAD_COND_INITIALIZER;
static inline void *
thread_function(void *args)
{
thread_queue_t *tq;
void *data;
tq = (thread_queue_t *)args;
pthread_detach(pthread_self());
pthread_cond_signal(&ready_cond);
while(1){
DEBUG("wait thread:%p", (void *)pthread_self());
pthread_mutex_lock(&tq->mutex);
while((data = tq_dequeue(tq)) == NULL){
pthread_cond_wait(&tq->cond, &tq->mutex);
}
pthread_mutex_unlock(&tq->mutex);
DEBUG("resume thread:%p", (void *)pthread_self());
//process
tq->worker(data);
while((data = tq_dequeue(tq)) != NULL){
tq->worker(data);
}
}
return NULL;
}
static inline thread_queue_t*
init_tq(int size, runnable *worker)
{
int i = 0;
thread_queue_t *tq;
tq = malloc(sizeof(thread_queue_t));
tq->q = init_queue();
pthread_mutex_init(&tq->mutex, NULL);
pthread_cond_init(&tq->cond, NULL);
tq->worker = worker;
tq->threads = malloc(sizeof(pthread_t) * size);
pthread_mutex_lock(&ready_mutex);
for(i = 0; i < size; i++){
if(!pthread_create(&tq->threads[i], NULL, thread_function, tq)){
pthread_cond_wait(&ready_cond, &ready_mutex);
}else{
perror("error");
}
}
pthread_mutex_unlock(&ready_mutex);
DEBUG("init tq %p", tq);
return tq;
}
static inline int
tq_enqueue(thread_queue_t *tq, void *data)
{
int ret;
ret = enqueue(tq->q, data);
if(ret > 0){
pthread_cond_broadcast(&tq->cond);
}
return ret;
}
static inline void*
tq_dequeue(thread_queue_t *tq)
{
return dequeue(tq->q);
}
int
main(void)
{
thread_queue_t *tq;
int i = 0;
tq = init_tq(128, process);
for(i = 0; i < 100000; i++){
tq_enqueue(tq, (void*)i);
}
sleep(3);
for(i = 0; i < 100000; i++){
tq_enqueue(tq, (void*)i);
}
sleep(10);
DEBUG("fin");
return 0;
}
#ifndef T_QUEUE_H
#define T_QUEUE_H
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <inttypes.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>
typedef struct {
unsigned int count;
struct _node_t *ptr;
} __attribute__ ((packed)) pointer_t;
typedef struct _node_t {
void *value;
pointer_t next;
} __attribute__ ((packed)) node_t;
typedef struct {
pointer_t head;
pointer_t tail;
} __attribute__ ((packed)) queue_t;
typedef struct {
queue_t *q;
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t *threads;
} __attribute__ ((packed)) thread_queue_t;
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment