Skip to content

Instantly share code, notes, and snippets.

@aneury1
Last active April 6, 2023 12:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aneury1/de51b7564d41b918a146c9fcb6f5e5d4 to your computer and use it in GitHub Desktop.
Save aneury1/de51b7564d41b918a146c9fcb6f5e5d4 to your computer and use it in GitHub Desktop.
std::vector<std::thread> threads;
int num_threads = std::thread::hardware_concurrency();
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&](){
while (true) {
// Wait for a task to be added to the queue
std::unique_lock<std::mutex> lock(queue_mutex);
while (task_queue.empty()) {
condition.wait(lock);
}
// Get the next task from the queue
std::function<void()> task = task_queue.front();
task_queue.pop();
// Unlock the mutex before executing the task
lock.unlock();
// Execute the task
task();
}
});
}
int listener_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listener_fd == -1) {
// Error handling
}
int opt = 1;
setsockopt(listener_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8080);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int bind_status = bind(listener_fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (bind_status == -1) {
// Error handling
}
int listen_status = listen(listener_fd, SOMAXCONN);
if (listen_status == -1) {
// Error handling
}
while (true) {
int client_fd = accept(listener_fd, NULL, NULL);
if (client_fd == -1) {
// Error handling
}
// Add a task to the thread pool to handle the connection
std::lock_guard<std::mutex> lock(queue_mutex);
task_queue.emplace([&](){
handle_connection(client_fd);
close(client_fd);
});
condition.notify_one();
}
void handle_connection(int client_fd) {
char buffer[1024];
int num_bytes = read(client_fd, buffer, sizeof(buffer));
if (num_bytes == -1) {
// Error handling
}
// Parse the HTTP request
std::string request(buffer, num_bytes);
// ...
// Send the HTTP response
std::string response = "HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nHello";
int bytes_sent = send(client_fd, response.c_str(), response.length(), 0);
if (bytes_sent == -1) {
// Error handling
}
}
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
class ThreadPool {
public:
ThreadPool(size_t num_threads) : stop(false) {
for (size_t i = 0; i < num_threads; ++i) {
threads.emplace_back(
[this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>
(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& thread : threads)
thread.join();
}
private:
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment