Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
基于互斥锁和条件变量的线程池
#include<pthread.h>
#include<unistd.h>
#include<stdio.h>
#include<stdlib.h>
#include<functional>
#include<queue>
#include<iostream>
typedef std::function<void()> func;
struct Job {
int id;
func run_func;
};
struct ThreadPool {
pthread_t threads[8]; // 8个线程
pthread_mutex_t m; // 互斥锁
pthread_cond_t cond; // 条件变量
std::queue<Job*>q; // 任务队列
ThreadPool() {
for(int i = 0;i < 8;i++) {
pthread_create(&threads[i], NULL, work_func, this);
}
pthread_mutex_init(&m, NULL);
pthread_cond_init(&cond, NULL);
}
static void* work_func(void* args) {
while(1) {
Job* job = (static_cast<ThreadPool*>(args))->pop();
job->run_func();
// printf("thread %d\n", pthread_self());
delete job;
}
}
Job* pop() {
pthread_mutex_lock(&m);
while(q.empty()) { // 同时唤醒 或 虚假唤醒
printf("wait before\n");
pthread_cond_wait(&cond, &m); // pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的mtx,然后阻塞在等待对列里休眠,直到再次被唤醒(大多数情况下是等待的条件成立而被唤醒,唤醒后,该进程会先锁定先pthread_mutex_lock(&mtx);,再读取资源
printf("wait after\n");
}
// pthread_mutex_lock(&m);
Job* job = q.front(); // 取出就绪的任务执行
q.pop();
pthread_mutex_unlock(&m);
return job;
}
void add(Job* job) {
pthread_mutex_lock(&m);
q.push(job); // 添加一个job到任务队列中
pthread_cond_signal(&cond);
pthread_mutex_unlock(&m);
}
};
int main() {
ThreadPool pool;
for(int i = 0;i < 100;i++) {
Job* job = new Job();
job->id = i;
job->run_func = [i]() {
printf("job %d\n", i);
};
pool.add(job);
}
for(int i = 0;i < 8;i++) {
pthread_join(pool.threads[i], NULL);
}
printf("main end\n");
return 0;
}
@growvv
Copy link
Author

growvv commented Dec 26, 2021

g++ -std=c++11 work_queue.cpp -o work_queue -lpthread

@growvv
Copy link
Author

growvv commented Dec 31, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment