|
#include <Rcpp.h> |
|
#include <thread> |
|
#include <future> |
|
|
|
using namespace Rcpp; |
|
|
|
typedef std::pair<int, int> Pair; |
|
typedef std::unordered_map<int, Pair> Map ; |
|
typedef Rcpp::Timer<std::chrono::high_resolution_clock> HighresTimer; |
|
|
|
class hash_task { |
|
public: |
|
hash_task( std::vector<int>& hashes_, IntegerMatrix& m_, Map& map_ ) : |
|
hashes(hashes_), m(m_), map(map_){} |
|
|
|
void operator()( int start, int end){ |
|
int nc = m.ncol() ; |
|
// get hashes for the range start:end |
|
for( int k=0, pow=1; k<nc; k++, pow*=2){ |
|
int* data = m.column(k).begin() ; |
|
auto target = hashes.begin() + start; |
|
|
|
std::transform( data+start, data+end, target, target, [=]( int v, int h ){ |
|
return h + pow*v ; |
|
}) ; |
|
} |
|
|
|
// train the map |
|
for( int i=start; i<end; i++){ |
|
Pair& p = map[ hashes[i] ] ; |
|
if( p.first == 0){ |
|
p.first = i+1 ; // using directly 1-based index |
|
} |
|
p.second++ ; |
|
} |
|
|
|
} |
|
|
|
private: |
|
std::vector<int>& hashes ; |
|
IntegerMatrix& m ; |
|
Map& map ; |
|
} ; |
|
|
|
void mergeMaps( Map& host, Map& x){ |
|
for( auto& it: host){ |
|
auto y = x.find(it.first) ; |
|
if( y != x.end() ){ |
|
it.second.second += y->second.second ; |
|
x.erase(y); |
|
} |
|
} |
|
|
|
host.insert( x.begin(), x.end() ) ; |
|
|
|
} |
|
|
|
// [[Rcpp::export]] |
|
List rowCounts_3(IntegerMatrix x, int nthreads ) { |
|
int n = x.nrow() ; |
|
std::vector<int> hashes(n) ; |
|
|
|
std::vector<std::thread> threads(nthreads-1) ; |
|
std::vector<Map> maps(nthreads) ; |
|
int start=0, chunk_size = n / nthreads ; |
|
|
|
for( int i=0; i<nthreads-1; i++){ |
|
hash_task task(hashes, x, maps[i]) ; |
|
threads[i] = std::thread( std::move(task), start, start+chunk_size) ; |
|
start += chunk_size ; |
|
} |
|
|
|
hash_task(hashes, x, maps[nthreads-1] )(start, n) ; |
|
|
|
for( int i=0; i<nthreads-1; i++) { |
|
threads[i].join() ; |
|
} |
|
|
|
Map& master = maps[0] ; |
|
for( int i=1; i<nthreads; i++){ |
|
mergeMaps( master, maps[i] ); |
|
} |
|
|
|
int nres = master.size() ; |
|
IntegerVector idx(nres), counts(nres) ; |
|
auto it=master.begin() ; |
|
for( int i=0; i<nres; i++, ++it){ |
|
idx[i] = it->second.first ; |
|
counts[i] = it->second.second ; |
|
} |
|
|
|
return List::create( _["counts"] = counts, _["idx"] = idx); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ------------------- instrumented version |
|
|
|
class instrumented_hash_task { |
|
public: |
|
instrumented_hash_task( std::vector<int>& hashes_, IntegerMatrix& m_, Map& map_, HighresTimer& timer_ ) : |
|
hashes(hashes_), m(m_), map(map_), timer(timer_) {} |
|
|
|
void operator()( int start, int end){ |
|
int nc = m.ncol() ; |
|
timer.step("start") ; |
|
// get hashes for the range start:end |
|
for( int k=0, pow=1; k<nc; k++, pow*=2){ |
|
int* data = m.column(k).begin() ; |
|
auto target = hashes.begin() + start; |
|
|
|
std::transform( data+start, data+end, target, target, [=]( int v, int h ){ |
|
return h + pow*v ; |
|
}) ; |
|
} |
|
timer.step("hash") ; |
|
|
|
// train the map |
|
for( int i=start; i<end; i++){ |
|
Pair& p = map[ hashes[i] ] ; |
|
if( p.first == 0){ |
|
p.first = i+1 ; // using directly 1-based index |
|
} |
|
p.second++ ; |
|
} |
|
timer.step( "train" ) ; |
|
} |
|
|
|
private: |
|
std::vector<int>& hashes ; |
|
IntegerMatrix& m ; |
|
Map& map ; |
|
HighresTimer& timer ; |
|
} ; |
|
|
|
// [[Rcpp::export]] |
|
List rowCounts_3_instr(IntegerMatrix x, int nthreads ) { |
|
int n = x.nrow() ; |
|
std::vector<int> hashes(n) ; |
|
|
|
std::vector<std::thread> threads(nthreads-1) ; |
|
std::vector<Map> maps(nthreads) ; |
|
std::vector<HighresTimer> timers = HighresTimer::get_timers(nthreads) ; |
|
int start=0, chunk_size = n / nthreads ; |
|
|
|
for( int i=0; i<nthreads-1; i++){ |
|
instrumented_hash_task task(hashes, x, maps[i], timers[i+1]) ; |
|
threads[i] = std::thread( std::move(task), start, start+chunk_size) ; |
|
start += chunk_size ; |
|
} |
|
timers[0].step( "dispatch" ) ; |
|
|
|
instrumented_hash_task(hashes, x, maps[nthreads-1], timers[0] )(start, n) ; |
|
|
|
for( int i=0; i<nthreads-1; i++) { |
|
threads[i].join() ; |
|
} |
|
timers[0].step( "join" ) ; |
|
|
|
Map& master = maps[0] ; |
|
for( int i=1; i<nthreads; i++){ |
|
mergeMaps( master, maps[i] ); |
|
timers[0].step( "merge" ) ; |
|
} |
|
|
|
int nres = master.size() ; |
|
IntegerVector idx(nres), counts(nres) ; |
|
auto it=master.begin() ; |
|
for( int i=0; i<nres; i++, ++it){ |
|
idx[i] = it->second.first ; |
|
counts[i] = it->second.second ; |
|
} |
|
timers[0].step( "collect" ) ; |
|
|
|
return List::create( _["counts"] = counts, _["idx"] = idx, _["timers"] = wrap(timers) ); |
|
|
|
} |
|
|
|
|