Skip to content

Instantly share code, notes, and snippets.

@rpoisel
Last active September 23, 2019 11:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rpoisel/fdff47f0f9451f2b3c9ff4777299dbeb to your computer and use it in GitHub Desktop.
Save rpoisel/fdff47f0f9451f2b3c9ff4777299dbeb to your computer and use it in GitHub Desktop.
PoolManager for WS workers
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
constexpr size_t const NUM_WORKER_THREADS = 3;
using WorkerFunc = std::function<void(void)>;
class Worker {
public:
Worker()
: func(nullptr), cleanup(false),
thr(std::thread(&Worker::func_wrapper, this)) {}
~Worker() {}
bool isUsed() { return func ? true : false; }
void setFunc(WorkerFunc &func) {
std::unique_lock<std::mutex> lk(m);
if (this->func) {
// throw exception
}
this->func = func;
lk.unlock();
cv.notify_one();
}
void shutdown() {
std::unique_lock<std::mutex> lk(m);
cleanup = true;
lk.unlock();
cv.notify_one();
thr.join();
}
private:
void func_wrapper(void) {
while (!cleanup) {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&] { return (cleanup || func); });
if (!cleanup) {
func();
}
func = nullptr; // make space available for next job
}
}
WorkerFunc func;
std::atomic_bool cleanup;
std::thread thr;
std::mutex m;
std::condition_variable cv;
};
class PoolManager {
public:
PoolManager() = default;
bool process(WorkerFunc func) {
auto worker = std::find_if(workers, workers + NUM_WORKER_THREADS,
[](Worker &w) { return !w.isUsed(); });
if (worker == std::end(workers)) {
return false;
}
worker->setFunc(func);
return true;
}
void shutdown() {
for (auto &worker : workers) {
worker.shutdown();
}
}
private:
Worker workers[NUM_WORKER_THREADS];
};
void handleClient(int fd) { std::cout << fd << std::endl; }
int main(void) {
using namespace std::chrono_literals;
PoolManager p;
for (size_t cnt = 0; cnt < NUM_WORKER_THREADS + 2; cnt++) {
if (!p.process([=]() { handleClient(cnt); })) {
std::cout << "Could not allocate worker in pool." << std::endl;
}
// std::this_thread::sleep_for(0.5s);
}
std::this_thread::sleep_for(1s);
p.shutdown();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment