Skip to content

Instantly share code, notes, and snippets.

@romainfrancois
Created January 9, 2014 13:54
Show Gist options
  • Save romainfrancois/8334411 to your computer and use it in GitHub Desktop.
Save romainfrancois/8334411 to your computer and use it in GitHub Desktop.
Simple implementation of which using threads in C++11
> bench(1000)
Unit: microseconds
expr min lq median uq max neval
which(b) 2.104 4.548 5.1000 6.0955 18.306 100
which_cpp_nthreads(b, 2) 26.401 28.303 28.9285 30.0625 103.980 100
which_cpp_nthreads(b, 4) 45.902 49.494 50.6335 52.1155 115.184 100
which_cpp_nthreads(b, 8) 97.596 100.143 101.3720 104.9895 270.521 100
> bench(1e+05)
Unit: microseconds
expr min lq median uq max neval
which(b) 387.247 400.1215 418.9635 664.8670 17795.71 100
which_cpp_nthreads(b, 2) 280.149 315.4075 337.2355 437.7310 27336.24 100
which_cpp_nthreads(b, 4) 226.678 264.1730 285.7150 342.5565 17886.84 100
which_cpp_nthreads(b, 8) 280.099 310.4090 350.5835 428.6745 17681.50 100
> bench(1e+07)
Unit: milliseconds
expr min lq median uq max neval
which(b) 42.74872 43.91283 49.14603 54.41899 89.68374 100
which_cpp_nthreads(b, 2) 24.07307 28.57485 31.69920 39.01933 64.15361 100
which_cpp_nthreads(b, 4) 14.64242 21.81248 23.60718 29.60597 71.86710 100
which_cpp_nthreads(b, 8) 13.52122 17.26611 20.50603 29.84738 54.59843 100
#include <Rcpp.h>
#include <thread>
#include <future>
using namespace Rcpp;
int process( int* begin, int* end, int index, int* out){
int count = 0 ;
while(begin < end){
index++ ;
if(*begin) out[count++] = index ;
begin++;
}
return count ;
}
typedef std::packaged_task< int(int*, int*, int, int*) > Task ;
//[[Rcpp::export]]
IntegerVector which_cpp_nthreads(LogicalVector b, int nthreads){
int n = b.size() ;
std::vector<std::future<int>> futures(nthreads-1) ;
std::vector<std::thread> threads(nthreads-1) ;
std::vector<IntegerVector> chunks(nthreads-1) ;
int chunk_size = n / nthreads ;
for( int i=0; i<nthreads-1; i++){
chunks[i] = IntegerVector(no_init(chunk_size)) ;
}
int* it = b.begin() ;
int pos = 0 ;
for( int i=0; i<nthreads-1; i++){
Task task( &process ) ;
futures[i] = task.get_future();
threads[i] = std::thread( std::move(task), it, it+chunk_size, pos, chunks[i].begin() ) ;
pos += chunk_size ;
it += chunk_size ;
}
int last_chunk_size = n-(nthreads-1)*chunk_size ;
IntegerVector last_chunk = no_init(last_chunk_size) ;
int n0 = process( it, b.end(), pos, last_chunk.begin() ) ;
int m = n0 ;
std::vector<int> sizes( nthreads-1) ;
for( int i=0; i<nthreads-1; i++) {
threads[i].join() ;
sizes[i] = futures[i].get() ;
m += sizes[i] ;
}
IntegerVector res = no_init(m);
int *p = res.begin() ;
for( int i=0; i<nthreads-1; i++){
std::copy( chunks[i].begin(), chunks[i].begin() + sizes[i], p ) ;
p += sizes[i] ;
}
std::copy( last_chunk.begin(), last_chunk.begin() + n0, p ) ;
return res ;
}
/*** R
library(microbenchmark)
bench <- function(n){
b <- sample(c(T,F),n,replace=T)
microbenchmark(
which(b),
which_cpp_nthreads(b, 2),
which_cpp_nthreads(b, 4),
which_cpp_nthreads(b, 8)
)
}
bench(1000)
bench(100000)
bench(10000000)
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment