Skip to content

Instantly share code, notes, and snippets.

@tuan3w
Created November 26, 2013 09:27
Show Gist options
  • Save tuan3w/7655624 to your computer and use it in GitHub Desktop.
Save tuan3w/7655624 to your computer and use it in GitHub Desktop.
task_construct.c
#include <pthread.h>
#include <stdio.h>
#include <vector>
#include <unistd.h>
#include <stdlib.h>
struct Job {
void* (*func)(void*);
void *param;
void (*reduce)(void*);
bool used; //flag
};
struct task_param {
int s, e;
};
bool finished = true;
Job thread_jobs[100];
pthread_mutex_t lock, lock2;
bool th_status[100] = {1};
std::vector<Job> stack;
long s = 0, n, nthreads;
int total;
long aths; //number of thread created
void* wrapfunc(void* k) {
//maybe sleep ?
/*sleep(1);*/
/*printf("wrap funct call\n");*/
long id = (long) k;
printf("thread status: %d\n", th_status[id]);
printf("thread id %ld\n", id);
while (true) {
Job j = thread_jobs[id];
/*printf("status: + %d\n", j.used);*/
if (j.used)
break;
/*task_param *t = (task_param *)j.param;*/
/*printf("#task %d, %d\n", t->s, t->e);*/
thread_jobs[id].used = true;
void* r = j.func(j.param);
//reduce
pthread_mutex_lock(&lock);
j.reduce(r);
total --;
pthread_mutex_unlock(&lock);
/*sleep(1);*/
}
th_status[id] = false;
/*aths --;*/
printf("thread %ld die\n", id);
return NULL;
}
void* sum(void* t) {
/*printf("Sleep 2 s\n");*/
/*sleep(2);*/
task_param *task = (task_param *) t;
int start = task->s;
int end = task->e;
long s = 0;
for (int i = start; i <= end; i++) {
s += i;
}
/*printf("end func\n");*/
return (void*)s;
}
void add(void *r) {
s += (long)r;
}
void create_task(int n, int ntask, int nthreads) {
total = ntask;
int sz = n / ntask;
for (int i = 0; i < ntask; i++) {
Job j;
task_param *p = (task_param *) malloc(sizeof(task_param));
p->s = i * sz + 1;
if (i == ntask - 1)
p->e = n;
else
p->e = p->s + sz - 1;
j.param = (void* )p;
j.func = sum;
j.reduce = add;
j.used = false;
stack.push_back(j);
}
printf("Task size %ld\n", stack.size());
}
void pool(int nthreads) {
aths = 0;
//initialize with nthreads
for (int i = 0; i < nthreads; i++) {
pthread_t id;
if (stack.empty())
return; //no more task
Job j = stack.back();
thread_jobs[i] = j;
th_status[i] = true;
aths ++;
task_param *t = (task_param *) j.param;
printf("create threads, active threads: %ld\n", aths);
stack.pop_back();
pthread_create(&id, NULL, wrapfunc, (void *)i);
}
//pooling process
printf("pooling process... with %d process\n", nthreads);
/*printf("Stack.size() %d\n", stack.size());*/
pthread_t id;
while (total>0) {
//schedule here
//(dynamic,1) mode
for (int i = 0; i < nthreads; i++) {
Job t = thread_jobs[i];
if (thread_jobs[i].used) { //pool new task
if (stack.empty())
break;
Job j = stack.back();
thread_jobs[i] = j;
task_param *t = (task_param *) j.param;
/*printf("task remain: %d\n", stack.size());*/
stack.pop_back();
if (!th_status[i]) {
th_status[i] = true;
aths ++;
printf("create thread, active threads: %ld\n", aths);
pthread_create(&id, NULL, wrapfunc, (void *)i);
}
}
}
/*sleep(1);*/
}
printf("Done\n");
}
int main() {
//init
pthread_mutex_init(&lock, NULL);
n = 10000000;
nthreads = 4;
//create task
create_task(n, 100, nthreads);
aths = 0;
pool(nthreads);
printf("sum : %ld\n", s);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment