Skip to content

Instantly share code, notes, and snippets.

@ps
Last active November 30, 2016 16:14
Show Gist options
  • Save ps/f3f5566b66f6835e748904779c8a7222 to your computer and use it in GitHub Desktop.
Save ps/f3f5566b66f6835e748904779c8a7222 to your computer and use it in GitHub Desktop.
Fun with locks and condition variables
typedef struct WorkerParams {
int thread_id;
Log * log;
Jobs * jobs;
pthread_mutex_t jobs_lock;
pthread_cond_t work_added;
} WorkerParams;
void main() {
launch_master_node(3);
}
void launch_master_node(int num_workers) {
pthread_t * worker = (pthread_t *)malloc(sizeof(pthread_t) * num_workers);
WorkerParams * worker_params = (WorkerParams *)malloc(sizeof(WorkerParams) * num_workers);
for(i = 0; i < num_workers; i++) {
// the initialization function is 100% fine, no errors
init_worker_param(&worker_params[i], i);
pthread_create(&worker[i], NULL, &worker_node, (void *)&worker_params[i]);
}
for(i = 0; i < num_workers; i++) {
pthread_mutex_lock(&worker_params[i].jobs_lock);
Jobs * jobs = worker_params[i].jobs;
add_job(jobs, &dummy_job);
jobs->terminate = TRUE;
pthread_cond_broadcast(&worker_params[i].work_added);
pthread_mutex_unlock(&worker_params[i].jobs_lock);
}
// join on threads
for(i = 0; i < num_workers; i++) {
pthread_join(worker[i], NULL);
}
}
void * worker_node(void * params) {
WorkerParams * my_params = (WorkerParams *) params;
int thread_id = my_params->thread_id;
pthread_mutex_t jobs_lock = my_params->jobs_lock;
pthread_cond_t work_added = my_params->work_added;
while(TRUE) {
pthread_mutex_lock(&jobs_lock);
Jobs * jobs = my_params->jobs;
while(jobs->size == 0 && jobs->terminate == FALSE) {
pthread_cond_wait(&work_added, &jobs_lock);
}
int terminate = jobs->terminate;
JobFunction job = remove_job(jobs);
int num_jobs_remaining = jobs->size;
pthread_mutex_unlock(&jobs_lock);
job();
if(num_jobs_remaining == 0 && terminate == TRUE) {
break;
}
}
pthread_exit((void *)NULL);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment