Skip to content

Instantly share code, notes, and snippets.

@jart
Created May 25, 2018 19:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jart/84b96f56b92f21eb9ed2ffc4a372dedf to your computer and use it in GitHub Desktop.
Save jart/84b96f56b92f21eb9ed2ffc4a372dedf to your computer and use it in GitHub Desktop.
--- /tmp/NonBlockingThreadPool.h 2018-05-25 12:50:34.146863121 -0700
+++ NonBlockingThreadPool.h 2018-05-25 12:48:54.000000000 -0700
@@ -10,6 +10,7 @@
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
+#include <iostream>
namespace Eigen {
@@ -19,10 +20,10 @@
typedef typename Environment::Task Task;
typedef RunQueue<Task, 1024> Queue;
- NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment())
- : NonBlockingThreadPoolTempl(num_threads, true, env) {}
+ NonBlockingThreadPoolTempl(int num_threads, std::vector<int> &proc_set, Environment env = Environment())
+ : NonBlockingThreadPoolTempl(num_threads, proc_set, true, env) {}
- NonBlockingThreadPoolTempl(int num_threads, bool allow_spinning,
+ NonBlockingThreadPoolTempl(int num_threads, std::vector<int> &proc_set, bool allow_spinning,
Environment env = Environment())
: env_(env),
num_threads_(num_threads),
@@ -36,6 +37,12 @@
done_(false),
cancelled_(false),
ec_(waiters_) {
+ if(proc_set.size() != 0) {
+ proc_set_.insert(proc_set_.begin(), proc_set.begin(), proc_set.end());
+ num_threads_ = proc_set_.size();
+ }
+
+ thread_bound_.resize(num_threads_);
waiters_.resize(num_threads_);
// Calculate coprimes of num_threads_.
@@ -67,6 +74,13 @@
}
~NonBlockingThreadPoolTempl() {
+ if(proc_set_.size() != 0){
+ std::cout << "NonBlockingThreadPool: " << (long) this << " bound to: ";
+ for(int i=0; i<thread_bound_.size(); i++)
+ std::cout << thread_bound_[i] << " ";
+ std::cout << "\n";
+ }
+
done_ = true;
// Now if all threads block without work, they will start exiting.
@@ -155,7 +169,7 @@
};
Environment env_;
- const int num_threads_;
+ int num_threads_;
const bool allow_spinning_;
MaxSizeVector<Thread*> threads_;
MaxSizeVector<Queue*> queues_;
@@ -166,9 +180,39 @@
std::atomic<bool> done_;
std::atomic<bool> cancelled_;
EventCount ec_;
+ std::vector<int> proc_set_;
+ std::vector<int> thread_bound_;
// Main worker thread loop.
void WorkerLoop(int thread_id) {
+ if(proc_set_.size() != 0) {
+ //set the thread affinity
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(proc_set_[thread_id], &cpuset);
+ pthread_t thread = pthread_self();
+
+ int s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
+ if (s != 0) {
+ std::cout << "NonBlockingThreadPool: Failed to set thread affinity\n";
+ return;
+ }
+
+ /* Check the actual affinity mask assigned to the thread */
+ s = pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
+ if (s != 0)
+ std::cout << "NonBlockingThreadPool: Failed to call pthread_getaffinity_np\n";
+ else {
+ for (int j = 0; j < CPU_SETSIZE; j++)
+ if (CPU_ISSET(j, &cpuset))
+ thread_bound_[thread_id] = j;
+ }
+
+ char buf[1024];
+ sprintf(buf, "eigen worker %d", thread_id);
+ pthread_setname_np(thread, buf);
+ }
+
PerThread* pt = GetPerThread();
pt->pool = this;
pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment