Skip to content

Instantly share code, notes, and snippets.

@yknishidate
Created November 4, 2023 03:34
Show Gist options
  • Save yknishidate/98249597796cb10f0de657272a9755d3 to your computer and use it in GitHub Desktop.
Save yknishidate/98249597796cb10f0de657272a9755d3 to your computer and use it in GitHub Desktop.
Job System
#pragma once
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
// Job: 任意のタスクを表すクラス
class Job {
public:
using JobFunction = std::function<void()>;
private:
JobFunction function;
public:
Job() = default;
Job(JobFunction func) : function(std::move(func)) {}
void execute() const {
if (function) {
function();
}
}
};
// JobQueue: スレッドセーフなジョブキュー
class JobQueue {
private:
std::queue<Job> jobs;
mutable std::mutex queueMutex;
std::condition_variable condition;
bool stopAll;
public:
JobQueue() : stopAll(false) {}
void push(Job job) {
std::lock_guard<std::mutex> lock(queueMutex);
jobs.push(std::move(job));
condition.notify_one();
}
bool waitAndPop(Job& job) {
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stopAll || !jobs.empty(); });
if (!jobs.empty()) {
job = std::move(jobs.front());
jobs.pop();
return true;
}
return false; // キューが空であれば、falseを返す
}
bool empty() const {
std::lock_guard<std::mutex> lock(queueMutex);
return jobs.empty();
}
void shutdown() {
std::lock_guard<std::mutex> lock(queueMutex);
stopAll = true;
condition.notify_all();
}
};
// ThreadPool: ジョブを実行するスレッドのプール
class ThreadPool {
private:
std::vector<std::thread> workers;
JobQueue& jobQueue;
void workerFunction() const {
while (true) {
Job job;
if (!jobQueue.waitAndPop(job)) {
break; // シャットダウンフラグが立っているかキューが空の場合はループを抜ける
}
try {
job.execute(); // ジョブを実行
} catch (const std::exception& e) {
std::cerr << "Job execution failed: " << e.what() << std::endl;
}
}
}
public:
ThreadPool(JobQueue& queue, size_t threadCount) : jobQueue(queue) {
for (size_t i = 0; i < threadCount; ++i) {
workers.emplace_back(&ThreadPool::workerFunction, this);
}
}
// すべてのジョブを待機してワーカーを終了させる
void shutdown() {
jobQueue.shutdown(); // キューにシャットダウンを通知
// 全てのワーカースレッドを終了させる
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
};
// JobSystem: ジョブを管理するクラス
class JobSystem {
private:
JobQueue jobQueue;
ThreadPool threadPool;
public:
JobSystem(size_t threadCount) : threadPool(jobQueue, threadCount) {}
// ジョブを追加する
void scheduleJob(Job::JobFunction jobFunction) { jobQueue.push(Job(std::move(jobFunction))); }
// シャットダウン処理
void shutdown() { threadPool.shutdown(); }
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment