Skip to content

Instantly share code, notes, and snippets.

@etorth
Created May 18, 2022 07:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save etorth/421b7c5db878eb3176c53f4a84effbfa to your computer and use it in GitHub Desktop.
Save etorth/421b7c5db878eb3176c53f4a84effbfa to your computer and use it in GitHub Desktop.
/*
* =====================================================================================
*
* Filename: fvThreadPool.h
* Created: 01/23/2016 13:46:24
* Description:
*
* Version: 1.0
* Revision: none
* Compiler: g++ -std=c++14
*
* Author:
* Email:
* Organization:
*
* =====================================================================================
*/
#pragma once
#include <mutex>
#include <queue>
#include <memory>
#include <atomic>
#include <string>
#include <cstring>
#include <vector>
#include <thread>
#include <future>
#include <cstddef>
#include <numeric>
#include <utility>
#include <stdexcept>
#include <exception>
#include <functional>
#include <type_traits>
#include <condition_variable>
// hack code for CCMPR02523786
// INTEL uses gcc4 and refuses to upgrade to gcc6 as our tool builtin with
// when they use gcc4 to build their testbench, the dynamic loader incorrectly loaded std::future::get<void>() from our libsideFile_xlm.so <- libuart.so <- tb
// this hack eliminates any symbol std::future::get<void>() in libsideFile_xlm.so, use std::future::get<_hack_stupid_type_void_t>() instead
// undo this hack later when INTEL realize how stupid it is
struct _hack_stupid_type_void_t
{
int unused = 0;
};
#define disableParallelOnEnv(envName) [](){const static bool envset = std::getenv("" envName); return envset;}()
// INTERFACE:
// const int fvThreadPool::poolSize
// // how many threads used in the pool, zero if pool is disabled
//
// const bool fvThreadPool::disablePool
// // true if pool is disabled
// // when disabled all added task to the pool get ran in fvThreadPool::addTask()
//
// size_t fvThreadPool::hwThreadCount()
// // thread count helper function
// // returns std::hardware_concurrency() if it's non-zero, otherwise returns 16
//
// size_t fvThreadPool::limitedThreadCount(size_t limitedNumThread)
// // thread count helper function
// // returns std::min<size_t>(limitedNumThread, fvThreadPool::hwThreadCount()) if limitedNumThread is not zero, otherwise return fvThreadPool::hwThreadCount()
//
// fvThreadPool(size_t numThread, bool disable = false)
// // create pool with a specified numThread, uses fvThreadPool::hwThreadCount() if numThread == 0
// // example:
// //
// // fvThreadPool pool(12, std::getenv("DISABLE_POOL"));
// // pool.addTask(...);
// //
// // notes:
// // 1. specify numThread = 1 to force all posted tasks ranning sequentially in the single pool thread, won't block the calling thread
// // 2. if disabled, all tasks sequentially get executed in addTask(), this blocks the calling thread
//
// fvThreadPool(std::initializer_list<std::function<void(int)>, bool disable = false)
// fvThreadPool(fvThreadPool::abortedTag &hasError, std::initializer_list<std::function<void(int)>, bool disable = false)
// // create pool with size same as list size, each task gets automatically added by fvThreadPool::addTask(), disable is as above
// // example:
// //
// // fvThreadPool
// // {
// // // list all tasks to get ran in parallel
// // // each task get 1 thread
// // {
// // [](int) { call_task1(); },
// // [](int) { call_task2(); },
// // [](int) { call_task3(); },
// // [](int) { call_task4(); },
// // },
// //
// // std::getenv("DISABLE_POOL"))
// // };
// //
//
// void fvThreadPool::addTask(Callable && task)
// void fvThreadPool::addTask(fvThreadPool::abortedTag &, Callable && task)
// // add a task to the pool to execute, task get blocking-ran in addTask() if pool is disabled
// // the signature of the task added should be equivalent to the following:
// //
// // void task(int threadId);
// //
// // all added task will get an internal copy by copy/move-constructor if using thread pool
// // example:
// //
// // fvThreadPool pool(12, std::getenv("DISABLE_POOL"));
// // fvThreadPool::abortedTag hasError;
// //
// // pool.addTask([](int){ call_non_throw_task(); });
// // pool.addTask(hasError, [](int){ call_task1(); });
// // pool.addTask(hasError, [](int){ call_task2(); });
// //
// // pool.finish();
// // hasError.checkError();
// //
// // notes:
// // 1. don't call addTask() after fvThreadPool::finish()
// // 2. don't call fvThreadPool::finish() inside the Callable
//
// void fvThreadPool::addForTask(size_t begin, size_t end, ForCallable && task)
// void fvThreadPool::addForTask(fvThreadPool::abortedTag &, size_t begin, size_t end, ForCallable && task)
// // call task(threadId, i) with i in [begin, end) in parallel
// // the signature of the task added should be equivalent to the following:
// //
// // void task(int threadId, size_t index);
// //
// // task will have an unique internal copy, all i in [begin, end) get called based on this copy
// // example:
// //
// // struct call_counter
// // {
// // std::atomic<int> count = 0;
// // void operator () (int, size_t)
// // {
// // count++;
// // }
// //
// // ~call_counter()
// // {
// // std::printf("%p get called %d times.\n", this, count.load());
// // }
// // };
// //
// // pool.addTask(10, 299, call_counter());
// //
//
// void fvThreadPool::finish() noexcept
// // join and release all threads in the pool, when returns all added tasks should be done
// // after this function the thread pool can NOT be reused, pool is dead
// // notes:
// // 1. this function can be called multiple times, only the first call takes effect
// // 2. this function get called automatically in fvThreadPool destructor
//
// ~fvThreadPool()
// // call finish() if not explicitly get called
// INTERFACE
// env: FV_GLOBAL_THREAD_POOL_SIZE
// // define the number of threads in the internal global pool
// // if not set then uses fvThreadPool::hwThreadCount()
//
// env: DISABLE_FV_GLOBAL_THREAD_POOL
// // disable the global pool
// // all posted task get blocking-executed at post time
//
// std::future<T> fvGlobalThreadPool::postEvalTask(EvalCallable && evalTask)
// // post an eval-task to the global pool and return a std::future<T>, evalTask get an internal copy
// // the signature of evalTask posted should be equivalent to the following:
// //
// // T evalTask(int threadId);
// //
// // evalTask only accepts an integral parameter to pass the thread id
// // any extra prameters should be passed as lambda captures
// // example:
// //
// // const int parm = 128;
// // auto f = fvGlobalThreadPool::postEvalTask([parm](int threadId) -> std::string
// // {
// // if(parm < 0){
// // throw std::runtime_error(std::string("invalid parm = ") + std::to_string(parm));
// // }
// //
// // int sum = 0;
// // for(int i = 0; i < parm; ++i){
// // sum += i;
// // }
// // return std::string("threadId ") + std::to_string(threadId) + "gets sum: " + std::to_string(sum);
// // });
// //
// // // some other logic
// // // ...
// //
// // std::cout<< f.get() << std::endl;
// //
// // notes:
// // 1. all exceptions will be forward to thread calling std::future<T>::get()
// // 2. if discard the returned std::future<T>, the postEvalTask() won't block, this is not like std::async
//
// fvGlobalThreadPool::parallelFor(size_t begin, size_t end, const ForCallable & forTask, bool disableParallelFor = false)
// // if the global pool is not disabled, splits the given range [begin, end) evenly into N groups, where N = fvGlobalThreadPool::getGlobalPool().poolSize
// // post each group by fvGlobalThreadPool::postEvalTask(), it blocks till all N groups finishes or throws
// // example:
// //
// // fvGlobalThreadPool::parallelFor(102, 299, [](int threadId, size_t i)
// // {
// // std::printf("thread id = %d, i = %zu\n", threadId, i);
// // if(i % 128 == 0){
// // throw std::runtime_error(std::string("throw in thread id = ") + std::to_string(threadId));
// // }
// // });
// //
// // notes:
// // 1. it's blocking all till calls on [begin, end) finishes
// // 2. all calls access the forTask const-ref, there is no copy of it
// // 3. all exception thrown by forTask will be forward to calling thread automatically
//
class fvThreadPool
{
public:
friend class fvGlobalThreadPool; // only global pool support recursive task
public:
class deadPoolError: public std::exception // give a distinct type to detect dead pool
{
const char *what() const noexcept override
{
return "deadPoolError";
}
};
// simple scope guard wrapper
// didn't check any exception when construct/destruct
class scopeGuard final
{
private:
std::function<void()> m_cb;
public:
template<typename ScopeCleanFunc> explicit scopeGuard(ScopeCleanFunc && func)
: m_cb(std::forward<ScopeCleanFunc>(func))
{}
private:
scopeGuard (const scopeGuard &) = delete;
scopeGuard & operator = (const scopeGuard &) = delete;
public:
~scopeGuard()
{
if(m_cb){
m_cb();
}
}
};
public:
class abortedTag final
{
private:
friend class fvThreadPool;
private:
std::exception_ptr m_except;
private:
std::atomic<bool> m_tag {false};
public:
void checkError() const
{
if(!m_tag.load()){
return;
}
if(!m_except){
throw std::runtime_error("fvThreadPool::abortedTag: missing exception");
}
std::rethrow_exception(m_except);
}
};
private:
struct innTaskNode
{
size_t level = 0;
std::function<void(int)> task;
bool operator < (const innTaskNode &param) const noexcept
{
return this->level < param.level;
}
};
struct innWorkerThread
{
bool idle = false;
size_t level = 0;
std::thread threadHandle {};
};
public:
const size_t poolSize;
const bool disablePool;
private:
const uint64_t m_threadPoolID;
const bool m_allowRecursive;
private:
bool m_stop = false;
private:
const int m_waitOnIdle; // in seconds
private:
std::mutex m_lock;
std::condition_variable m_condition;
private:
size_t m_idleThreadCount = 0;
private:
std::priority_queue<innTaskNode> m_taskQ;
private:
// the worker lambda accesses *this* member variables
// make sure all member variables has been ready before start any worker thread
//
// first : true means current thread has been marked as idle
// second : thread handle
//
std::vector<innWorkerThread> m_workers;
private:
auto getThreadFunc(int threadId)
{
return [this, threadId]()
{
getCurrThreadID(threadId);
getCurrThreadPoolID(m_threadPoolID);
while(true){
innTaskNode currTask;
{
std::unique_lock<std::mutex> lockGuard(m_lock);
m_idleThreadCount++;
// when a thread is trying to pick a task and task queue is not empty
// this thread shall not count for m_idleThreadCount
// but is fine because when task queue is not empty
// following lambda returns immediately without release the lock
// then outside world can not observe the m_idleThreadCount increments then decrements
const auto eval = [&lockGuard, this]() -> bool
{
const auto pred = [this]() -> bool
{
// let it wait if:
// 1. running
// 2. and no task in the queue
return m_stop || !m_taskQ.empty();
};
if(m_waitOnIdle > 0){
return m_condition.wait_for(lockGuard, std::chrono::seconds(m_waitOnIdle), pred);
}
else{
// no timeout enabled
// equivalent to wait_for(INT_MAX, pred)
m_condition.wait(lockGuard, pred);
return true;
}
}();
// thread has re-acquired lock
// when reaches here, thread already has something to do, either execute task or exit
m_idleThreadCount--;
if(eval){
if(m_stop && m_taskQ.empty()){
return;
}
else{
currTask = std::move(m_taskQ.top());
m_taskQ.pop();
}
}
else{
// after maxWaitTime pred() still returns false
// means pool is not stopped but didn't get any new tasks in maxWaitTime, aka idle for maxWaitTime, stop *current* thread
m_workers.at(threadId).idle = true;
return;
}
}
// no exception handling
// use threadCBWrapper() if task may throw, otherwise the exception breaks the pool
// thread-level gets assigned always and only before executing a task
// thread-level can only be r/w-accessed by itself's thread, worker thread j shall not access level[i] when i != j
m_workers[threadId].level = currTask.level;
currTask.task(threadId);
}
};
}
private:
static int getCurrThreadID(int threadID = -1)
{
const thread_local int t_threadID = threadID;
return t_threadID;
}
static uint64_t getCurrThreadPoolID(uint64_t threadPoolID = 0)
{
const thread_local uint64_t t_threadPoolID = threadPoolID;
return t_threadPoolID;
}
private:
fvThreadPool(size_t numThread, bool disable, int waitOnIdle, bool allowRecursive) // for fvGlobalThreadPool only to enable recursive task
: poolSize([numThread, disable]() -> size_t
{
if(disable){
return 0;
}
if(numThread > 0){
return numThread;
}
return fvThreadPool::hwThreadCount();
}())
, disablePool(disable)
, m_threadPoolID([]() -> uint64_t
{
static std::atomic<uint64_t> threadPoolID {1};
return threadPoolID++;
}())
, m_allowRecursive(allowRecursive)
, m_waitOnIdle([waitOnIdle]() -> int
{
if(waitOnIdle < 0){
throw std::runtime_error(std::string("invalid wait time: ") + std::to_string(waitOnIdle));
}
else{
return waitOnIdle;
}
}())
{
if(disablePool){
return;
}
// the thread lambda accesses *this*
// so pre-allocate m_workers before start any threads
m_workers.resize(poolSize);
// defer the thread spawn if we enabled the waitOnIdle
// only pre-allocate the worker slots
// should be very careful for future change
// when calling finish() to exit the pool, the pool may not even spawn any threads yet, so thread::join() in finish() won't happen
for(size_t threadId = 0; threadId < poolSize; ++threadId){
if(m_waitOnIdle > 0){
m_workers[threadId].idle = true;
}
else{
m_workers[threadId].idle = false;
m_workers[threadId].threadHandle = std::thread(getThreadFunc(threadId));
}
}
}
public:
fvThreadPool(size_t numThread, bool disable = false, int waitOnIdle = 0)
: fvThreadPool(numThread, disable, waitOnIdle, false)
{}
fvThreadPool(std::initializer_list<std::function<void(int)>> taskList, bool disable = false, int waitOnIdle = 0)
: fvThreadPool(taskList.size(), disable, waitOnIdle)
{
for(auto task: taskList){
addTask(std::move(task));
}
}
fvThreadPool(fvThreadPool::abortedTag &hasError, std::initializer_list<std::function<void(int)>> taskList, bool disable = false, int waitOnIdle = 0)
: fvThreadPool(taskList.size(), disable, waitOnIdle)
{
for(auto task: taskList){
addTask(hasError, std::move(task));
}
}
public:
virtual ~fvThreadPool()
{
finish();
}
public:
template<typename Callable> void addTask(Callable && task)
{
if(disablePool){
if(m_stop){
throw deadPoolError();
}
task(0);
return;
}
// in thread pool mode
// m_stop need to be protected before access
{
std::unique_lock<std::mutex> lockGuard(m_lock);
if(m_stop){
throw deadPoolError();
}
// a task adding subtask in same thread pool is called recursive task
// we don't trace which worker thread does task-adding, worker thread j added task then executed by worker thread i is a recursive task
const auto recursiveTask = (getCurrThreadPoolID() == m_threadPoolID);
if(recursiveTask){
if(m_allowRecursive){
if(m_idleThreadCount == 0){
const auto deadThreadCount = std::accumulate(m_workers.begin(), m_workers.end(), 0, [](auto curr, const auto &worker)
{
return curr + (worker.idle ? 1 : 0);
});
// there is no thread waiting for new task, nor possible thread slots
// if push to task queue, it's possible that there is no thread going to execute it if all worker threads are under-hold
// execute task immediately
// the task itself can recursively call addTask()
if(deadThreadCount == 0){
lockGuard.unlock();
task(getCurrThreadID());
return;
}
}
}
else{
throw std::runtime_error("fvThreadPool::addTask: recursive task added");
}
}
// post task to task queue
// we are sure the task can get a worker thread for execution
// TODO is this really good?
// when we post current task with level[threadId] + 1, it may not be at the top of task queue
// but should be fine, since even it's not on top
// parent-task that depends on this task has lower level, and which holds current thread only
if(m_waitOnIdle > 0){
const auto taskCntPending = m_taskQ.size() + 1;
for(size_t i = 0, restored = 0; i < m_workers.size() && restored < taskCntPending; ++i){
if(m_workers[i].idle){
if(m_workers[i].threadHandle.joinable()){
m_workers[i].threadHandle.join();
}
restored++;
m_workers[i].idle = false;
m_workers[i].threadHandle = std::thread(getThreadFunc((int)(i)));
}
}
}
// prefer recursive tasks, not task that added by non-worker threads, otherwise may cause starvation
// only fvGlobalThreadPool can add recursive tasks, otherwise alterating-recursive-task-adding by two pools can mess up everything
innTaskNode newTask;
newTask.level = recursiveTask ? (m_workers[getCurrThreadID()].level + 1) : 0;
newTask.task = std::forward<Callable>(task); // TODO move outside of lock
m_taskQ.push(std::move(newTask));
}
m_condition.notify_one();
}
template<typename Callable> void addTask(fvThreadPool::abortedTag &hasError, Callable && task)
{
addTask(fvThreadPool::threadCBWrapper(hasError, std::forward<Callable>(task)));
}
private:
template<typename ForCallable> void addForTaskHelper(fvThreadPool::abortedTag *hasErrorPtr, size_t beginIndex, size_t endIndex, ForCallable && forTask)
{
if(beginIndex >= endIndex){
return;
}
if(disablePool){
if(m_stop){
throw deadPoolError();
}
for(size_t i = beginIndex; i < endIndex; ++i){
forTask(0, i);
}
return;
}
// best effort to make all calls accessing same functor
// otherwise if use capture by value then different group accesses different functor
const size_t groupSize = (endIndex - beginIndex + poolSize - 1) / poolSize;
const auto taskObjPtr = std::make_shared<std::function<void(int, size_t)>>(std::forward<ForCallable>(forTask));
for(size_t group = 0; group < poolSize; ++group){
const size_t groupBegin = beginIndex + group * groupSize;
const size_t groupEnd = std::min<size_t>(groupBegin + groupSize, endIndex);
if(groupBegin >= groupEnd){
break;
}
const auto fnLoop = [groupBegin, groupEnd, taskObjPtr](int threadId)
{
for(size_t i = groupBegin; i < groupEnd; ++i){
(*taskObjPtr)(threadId, i);
}
};
if(hasErrorPtr){
addTask(*hasErrorPtr, fnLoop);
}
else{
addTask(fnLoop);
}
}
}
public:
template<typename ForCallable> void addForTask(size_t beginIndex, size_t endIndex, ForCallable && forTask)
{
addForTaskHelper(nullptr, beginIndex, endIndex, std::forward<ForCallable>(forTask));
}
template<typename ForCallable> void addForTask(fvThreadPool::abortedTag &hasError, size_t beginIndex, size_t endIndex, ForCallable && forTask)
{
addForTaskHelper(&hasError, beginIndex, endIndex, std::forward<ForCallable>(forTask));
}
public:
void finish() noexcept
{
if(disablePool){
m_stop = true;
return;
}
// in thread pool mode
// m_stop need to be protected before access
{
std::unique_lock<std::mutex> lockGuard(m_lock);
if(m_stop){
return;
}
m_stop = true;
}
// notify all and join
// this implementation doesn't take care of exception
// need to make sure whenever a task get posted, there is an active thread running, especially when supporting lazy thread spawn in ctor()
// otherwise here the join() may not happen because thread may not get spawned yet
// in which case it causes posted task not get ran after finis() returns
m_condition.notify_all();
for(auto &worker: m_workers){
if(worker.threadHandle.joinable()){
worker.threadHandle.join();
}
}
// release thread objects back to system
// after fvThreadPool::finish() the pool is dead, can't restart it
m_workers.clear();
}
public:
static size_t limitedThreadCount(size_t limitedNumThread)
{
const auto hwThreadCnt = hwThreadCount();
if(limitedNumThread > 0){
return std::min<size_t>(limitedNumThread, hwThreadCnt);
}
return hwThreadCnt;
}
static size_t hwThreadCount()
{
const size_t hwThreadCnt = std::thread::hardware_concurrency();
return hwThreadCnt > 0 ? hwThreadCnt : 16;
}
public:
// helper wrapper to handle exception in fvThreadPool
// example:
//
// fvThreadPool::abortedTag hasError;
//
// pool.addTask(fvThreadPool::threadCBWrapper(hasError, [](int){ /* code */ }));
// pool.addTask(fvThreadPool::threadCBWrapper(hasError, [](int){ /* code */ }));
// pool.addTask(fvThreadPool::threadCBWrapper(hasError, [](int){ /* code */ }));
//
// pool.finish();
// hasError.checkError();
//
template<typename Callable> static auto threadCBWrapper(fvThreadPool::abortedTag &hasError, Callable && task)
{
return [&hasError, taskCpy = std::forward<Callable>(task)](int threadId)
{
if(hasError.m_tag.load()){
return;
}
try{
taskCpy(threadId);
}
catch(...){
bool expected = false;
if(hasError.m_tag.compare_exchange_strong(expected, true)){
hasError.m_except = std::current_exception();
}
else{
// only capture and forward the first exception
// ignore all the rest
}
}
};
}
};
class fvGlobalThreadPool final
{
private:
fvGlobalThreadPool() = delete;
private:
static fvThreadPool &getGlobalPool()
{
static fvThreadPool globalPool([]() -> int
{
if(const auto p = std::getenv("FV_GLOBAL_THREAD_POOL_SIZE")){
int result = -1;
try{
result = std::stoi(p);
}
catch(...){
//
}
if(result >= 0){
return result;
}
throw std::runtime_error(std::string("invalid FV_GLOBAL_THREAD_POOL_SIZE: ") + p);
}
// CCR2457805
// global thread persist during the whole run
// don't use the full hardware concurrency to spawn two much idle threads
//
// TODO: 1. add thread manager to kill idle threads when low loading
// 2. or switch to tbb
return 0;
}(),
std::getenv("DISABLE_FV_GLOBAL_THREAD_POOL"),
[]() -> int
{
if(const auto p = std::getenv("FV_GLOBAL_THREAD_POOL_WAIT_ON_IDLE")){
int result = -1;
try{
result = std::stoi(p);
}
catch(...){
//
}
if(result >= 0){
return result;
}
throw std::runtime_error(std::string("invalid FV_GLOBAL_THREAD_POOL_WAIT_ON_IDLE: ") + p);
}
return 10;
}(),
std::getenv("FV_GLOBAL_THREAD_POOL_DISABLE_RECURSIVE_TASK") ? false : true);
return globalPool;
}
public:
template<typename EvalCallable> static auto postEvalTask(EvalCallable && evalTask)
{
#if __cplusplus >= 201703L
using EvalResultType = typename std::invoke_result_t<EvalCallable, int>;
#else
using EvalResultType = typename std::result_of_t<EvalCallable(int)>;
#endif
// use shared_ptr instead of instance/move
// otherwise need to mark posted lambda through pool.addTask() as mutable
auto packedTaskPtr = std::make_shared<std::packaged_task<EvalResultType(int)>>([taskCpy = std::forward<EvalCallable>(evalTask)](int threadId) -> EvalResultType
{
return taskCpy(threadId);
});
getGlobalPool().addTask([packedTaskPtr](int threadId)
{
(*packedTaskPtr)(threadId);
});
return packedTaskPtr->get_future();
}
public:
template<typename ForCallable> static void parallelFor(size_t beginIndex, size_t endIndex, const ForCallable & forTask, bool disableParallelFor = false)
{
if(beginIndex >= endIndex){
return;
}
const auto &pool = getGlobalPool();
if(pool.disablePool || disableParallelFor){
// won't check pool.m_stop here
// because 1. we may need acquire pool.m_lock if check it
// 2. this pool is internal and can NOT be stopped externally
for(size_t i = beginIndex; i < endIndex; ++i){
forTask(0, i);
}
return;
}
const size_t numTasks = pool.poolSize * 4;
const size_t groupSize = (endIndex - beginIndex + numTasks - 1) / numTasks;
std::vector<std::future<_hack_stupid_type_void_t>> forTaskResult;
forTaskResult.reserve(numTasks);
for(size_t group = 0; group < numTasks; ++group){
const size_t groupBegin = beginIndex + group * groupSize;
const size_t groupEnd = std::min<size_t>(groupBegin + groupSize, endIndex);
if(groupBegin >= groupEnd){
break;
}
forTaskResult.push_back(postEvalTask([groupBegin, groupEnd, &forTask](int threadId) -> _hack_stupid_type_void_t
{
for(size_t i = groupBegin; i < groupEnd; ++i){
forTask(threadId, i);
}
return {};
}));
}
for(auto &taskResult: forTaskResult){
taskResult.get();
}
}
static void waitEvalTaskList(std::initializer_list<std::function<void(int)>> taskList, bool disableParallelRun = false)
{
if(getGlobalPool().disablePool || disableParallelRun){
// won't check getGlobalPool().m_stop here
// because 1. we may need acquire getGlobalPool().m_lock if check it
// 2. this pool is internal and can NOT be stopped externally
for(auto &task: taskList){
task(0);
}
return;
}
std::vector<std::future<_hack_stupid_type_void_t>> result;
result.reserve(taskList.size());
for(auto &task: taskList){
result.push_back(postEvalTask([&task](int threadId) -> _hack_stupid_type_void_t
{
task(threadId);
return {};
}));
}
for(auto &f: result){
f.get();
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment