Skip to content

Instantly share code, notes, and snippets.

@romainfrancois
Last active August 29, 2015 13:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save romainfrancois/10016972 to your computer and use it in GitHub Desktop.
Save romainfrancois/10016972 to your computer and use it in GitHub Desktop.
How should I count the number of unique rows in a 'binary' matrix?

Several implementations of the problem from this SO thread. http://stackoverflow.com/questions/22886040/how-should-i-count-the-number-of-unique-rows-in-a-binary-matrix

The counts_cpp2.cpp is similar to the one I detailed in my SO answer. The counts_cpp3.cpp goes further by splitting the job into 4 threads, each dealing with a subset of rows of the matrix.

The graph shows the times of both solutions.

  • The first line on the bottom corresponds to the sequential version. Red is hashing and blue is training the hash map
  • The other 4 lines above are the times of the 4 threads of the parallel solution. Black is when the main threads joins the orker threads, and orange is merging the hash maps.
library(attributes)
library(Rcpp11)
library(microbenchmark)
rowCountsR <- function(x) {
## calculate hash
h <- m %*% matrix(2^(0:(ncol(x)-1)), ncol=1)
i <- which(!duplicated(h))
counts <- tabulate(h+1)
counts[order(h[i])] <- counts
list(counts=counts, idx=i)
}
source("plot_timers.R")
sourceCpp( "counts_cpp1.cpp" )
sourceCpp( "counts_cpp2.cpp" )
sourceCpp( "counts_cpp3.cpp" )
m <- matrix( sample(0:1, 1e7, TRUE), ncol=10)
system.time( R <- rowCountsR(m) )
system.time( cpp1 <- rowCounts_1(m) )
system.time( cpp2 <- rowCounts_2(m) )
system.time( cpp3 <- rowCounts_3(m, 4) )
icpp2 <- list( timers = rowCounts_2_instr(m)$timers )
icpp3 <- list( timers = rowCounts_3_instr(m, 4)$timers )
colors <- c(
hash = "red",
train = "blue",
collect = "green",
dispatch = "black",
start = "lightgray",
join = "black",
merge = "orange"
)
plot_timers_compare(
list(cpp2 = icpp2, cpp3 = icpp3),
colors = colors )
#include <Rcpp.h>
using namespace Rcpp;
inline int hash(IntegerMatrix::Row x) {
int n = x.size();
int hash = 0;
for (int j=0; j < n; ++j) {
hash += x[j] << j;
}
return hash;
}
// [[Rcpp::export]]
List rowCounts_1(IntegerMatrix x) {
int nrow = x.nrow();
typedef std::map<int, int> map_t;
map_t counts;
// keep track of insertion order with a separate vector
std::vector<int> ordered_hashes;
std::vector<int> insertion_order;
ordered_hashes.reserve(nrow);
insertion_order.reserve(nrow);
for (int i=0; i < nrow; ++i) {
IntegerMatrix::Row row = x(i, _);
int hashed_row = hash(row);
if (!counts[hashed_row]) {
ordered_hashes.push_back(hashed_row);
insertion_order.push_back(i);
}
++counts[hashed_row];
}
// fill the 'counts' portion of the output
int n = counts.size();
IntegerVector output(n);
for (int i=0; i < n; ++i) {
output[i] = counts[ ordered_hashes[i] ];
}
// fill the 'idx' portion of the output
IntegerVector idx(n);
for (int i=0; i < n; ++i) {
idx[i] = insertion_order[i] + 1; // 0 to 1-based indexing
}
return List::create(
_["counts"] = output,
_["idx"] = idx
);
}
#include <Rcpp.h>
using namespace Rcpp;
using HighresTimer = Rcpp::Timer<std::chrono::high_resolution_clock> ;
// [[Rcpp::export]]
List rowCounts_2(IntegerMatrix x) {
int n = x.nrow() ;
int nc = x.ncol() ;
std::vector<int> hashes(n) ;
for( int k=0, pow=1; k<nc; k++, pow*=2){
IntegerMatrix::Column column = x.column(k) ;
std::transform( column.begin(), column.end(), hashes.begin(), hashes.begin(), [=]( int v, int h ){
return h + pow*v ;
}) ;
}
using Pair = std::pair<int,int> ;
std::unordered_map<int, Pair> map_counts ;
for( int i=0; i<n; i++){
Pair& p = map_counts[ hashes[i] ] ;
if( p.first == 0){
p.first = i+1 ; // using directly 1-based index
}
p.second++ ;
}
int nres = map_counts.size() ;
IntegerVector idx(nres), counts(nres) ;
auto it=map_counts.begin() ;
for( int i=0; i<nres; i++, ++it){
idx[i] = it->second.first ;
counts[i] = it->second.second ;
}
return List::create( _["counts"] = counts, _["idx"] = idx );
}
// [[Rcpp::export]]
List rowCounts_2_instr(IntegerMatrix x) {
HighresTimer timer ;
int n = x.nrow() ;
int nc = x.ncol() ;
std::vector<int> hashes(n) ;
for( int k=0, pow=1; k<nc; k++, pow*=2){
IntegerMatrix::Column column = x.column(k) ;
std::transform( column.begin(), column.end(), hashes.begin(), hashes.begin(), [=]( int v, int h ){
return h + pow*v ;
}) ;
}
timer.step("hash") ;
using Pair = std::pair<int,int> ;
std::unordered_map<int, Pair> map_counts ;
for( int i=0; i<n; i++){
Pair& p = map_counts[ hashes[i] ] ;
if( p.first == 0){
p.first = i+1 ; // using directly 1-based index
}
p.second++ ;
}
timer.step("train") ;
int nres = map_counts.size() ;
IntegerVector idx(nres), counts(nres) ;
auto it=map_counts.begin() ;
for( int i=0; i<nres; i++, ++it){
idx[i] = it->second.first ;
counts[i] = it->second.second ;
}
timer.step("collect") ;
return List::create( _["counts"] = counts, _["idx"] = idx, _["timers"] = List::create(timer) );
}
#include <Rcpp.h>
#include <thread>
#include <future>
using namespace Rcpp;
typedef std::pair<int, int> Pair;
typedef std::unordered_map<int, Pair> Map ;
typedef Rcpp::Timer<std::chrono::high_resolution_clock> HighresTimer;
class hash_task {
public:
hash_task( std::vector<int>& hashes_, IntegerMatrix& m_, Map& map_ ) :
hashes(hashes_), m(m_), map(map_){}
void operator()( int start, int end){
int nc = m.ncol() ;
// get hashes for the range start:end
for( int k=0, pow=1; k<nc; k++, pow*=2){
int* data = m.column(k).begin() ;
auto target = hashes.begin() + start;
std::transform( data+start, data+end, target, target, [=]( int v, int h ){
return h + pow*v ;
}) ;
}
// train the map
for( int i=start; i<end; i++){
Pair& p = map[ hashes[i] ] ;
if( p.first == 0){
p.first = i+1 ; // using directly 1-based index
}
p.second++ ;
}
}
private:
std::vector<int>& hashes ;
IntegerMatrix& m ;
Map& map ;
} ;
void mergeMaps( Map& host, Map& x){
for( auto& it: host){
auto y = x.find(it.first) ;
if( y != x.end() ){
it.second.second += y->second.second ;
x.erase(y);
}
}
host.insert( x.begin(), x.end() ) ;
}
// [[Rcpp::export]]
List rowCounts_3(IntegerMatrix x, int nthreads ) {
int n = x.nrow() ;
std::vector<int> hashes(n) ;
std::vector<std::thread> threads(nthreads-1) ;
std::vector<Map> maps(nthreads) ;
int start=0, chunk_size = n / nthreads ;
for( int i=0; i<nthreads-1; i++){
hash_task task(hashes, x, maps[i]) ;
threads[i] = std::thread( std::move(task), start, start+chunk_size) ;
start += chunk_size ;
}
hash_task(hashes, x, maps[nthreads-1] )(start, n) ;
for( int i=0; i<nthreads-1; i++) {
threads[i].join() ;
}
Map& master = maps[0] ;
for( int i=1; i<nthreads; i++){
mergeMaps( master, maps[i] );
}
int nres = master.size() ;
IntegerVector idx(nres), counts(nres) ;
auto it=master.begin() ;
for( int i=0; i<nres; i++, ++it){
idx[i] = it->second.first ;
counts[i] = it->second.second ;
}
return List::create( _["counts"] = counts, _["idx"] = idx);
}
// ------------------- instrumented version
class instrumented_hash_task {
public:
instrumented_hash_task( std::vector<int>& hashes_, IntegerMatrix& m_, Map& map_, HighresTimer& timer_ ) :
hashes(hashes_), m(m_), map(map_), timer(timer_) {}
void operator()( int start, int end){
int nc = m.ncol() ;
timer.step("start") ;
// get hashes for the range start:end
for( int k=0, pow=1; k<nc; k++, pow*=2){
int* data = m.column(k).begin() ;
auto target = hashes.begin() + start;
std::transform( data+start, data+end, target, target, [=]( int v, int h ){
return h + pow*v ;
}) ;
}
timer.step("hash") ;
// train the map
for( int i=start; i<end; i++){
Pair& p = map[ hashes[i] ] ;
if( p.first == 0){
p.first = i+1 ; // using directly 1-based index
}
p.second++ ;
}
timer.step( "train" ) ;
}
private:
std::vector<int>& hashes ;
IntegerMatrix& m ;
Map& map ;
HighresTimer& timer ;
} ;
// [[Rcpp::export]]
List rowCounts_3_instr(IntegerMatrix x, int nthreads ) {
int n = x.nrow() ;
std::vector<int> hashes(n) ;
std::vector<std::thread> threads(nthreads-1) ;
std::vector<Map> maps(nthreads) ;
std::vector<HighresTimer> timers = HighresTimer::get_timers(nthreads) ;
int start=0, chunk_size = n / nthreads ;
for( int i=0; i<nthreads-1; i++){
instrumented_hash_task task(hashes, x, maps[i], timers[i+1]) ;
threads[i] = std::thread( std::move(task), start, start+chunk_size) ;
start += chunk_size ;
}
timers[0].step( "dispatch" ) ;
instrumented_hash_task(hashes, x, maps[nthreads-1], timers[0] )(start, n) ;
for( int i=0; i<nthreads-1; i++) {
threads[i].join() ;
}
timers[0].step( "join" ) ;
Map& master = maps[0] ;
for( int i=1; i<nthreads; i++){
mergeMaps( master, maps[i] );
timers[0].step( "merge" ) ;
}
int nres = master.size() ;
IntegerVector idx(nres), counts(nres) ;
auto it=master.begin() ;
for( int i=0; i<nres; i++, ++it){
idx[i] = it->second.first ;
counts[i] = it->second.second ;
}
timers[0].step( "collect" ) ;
return List::create( _["counts"] = counts, _["idx"] = idx, _["timers"] = wrap(timers) );
}
plot_timers_compare <- function( timers_list , colors = c(
"data structures" = "black", "spawning" = "royalblue",
start = "lightgray", end = "darkgreen", fusion = "navyblue",
"temp" = "black", copy = "tomato", process = "darkgreen" ,
"allocate" = "black", "count" = "purple3", waiting = "gray"
), add.lines= TRUE, h = .8
){
timers <- lapply( timers_list, function(x) lapply( x[["timers"]], function(y) y ) )
xlim <- c(0, max(unlist(timers) ) )
ylim <- c(-1, length(timers) + sum(sapply(timers, length)) - 1 )
plot(0, type = "n", axes = FALSE, xlim = xlim, ylim = ylim, ann = FALSE )
usr <- par("usr")
rect( xleft = usr[1], xright = usr[2], ybottom = usr[3], ytop = usr[4], col = "lightgray" )
ticks <- axTicks(1)
plot_timers <- function( timers, pos ){
for(i in seq_along(timers)){
data <- timers[[i]]
rect(
ybottom = pos+i-1-h/2,
ytop = pos+i-1+h/2,
xleft = c(0,head(data, -1)),
xright = data,
col = colors[ match(names(data), names(colors)) ],
border = "lightgray", lwd =2 )
}
}
timers_names <- names(timers)
pos <- 0
for( i in seq_along(timers)){
data <- timers[[i]]
plot_timers( data, pos = pos)
if(length(timers)>1 && add.lines) abline( h = pos + length(data), lwd = 2)
pos <- pos + length(data) + 1
}
abline( v = ticks, col = "white")
axis(1, col = "lightgray" )
mtext( "time (micro seconds)", side = 1, line = 2 )
box()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment