Skip to content

Instantly share code, notes, and snippets.

@tnagler
Last active November 23, 2018 00:45
Show Gist options
  • Save tnagler/369d6e36e6e6b375bac0a9c57840aca7 to your computer and use it in GitHub Desktop.
Save tnagler/369d6e36e6e6b375bac0a9c57840aca7 to your computer and use it in GitHub Desktop.
RcppThread benchmarks
---
title: "RcppThread benchmarks"
output: html_document
---
```{r setup, include = FALSE}
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>",
fig.width = 7,
fig.height = 5
)
```
```{r}
library(tidyverse)
library(microbenchmark)
if (!require(naglr)) {
if (!require(devtools))
install.packages("devtools")
devtools::install_github("tnagler/naglr")
}
library(naglr)
```
# Measuring synchronization overhead
## Create, join, and destruct threads
```{Rcpp}
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppThread)]]
#include <RcppThread.h>
// [[Rcpp::export]]
void stdEmpty(int nThreads)
{
std::vector<std::thread> threads;
for (size_t t = 0; t < nThreads; ++t) {
threads.emplace_back( [] {} );
}
for (size_t t = 0; t < nThreads; ++t) {
threads[t].join();
}
}
// [[Rcpp::export]]
void RcppThreadEmpty(int nThreads)
{
std::vector<RcppThread::Thread> threads;
for (size_t t = 0; t < nThreads; ++t) {
threads.emplace_back( [] {} );
}
for (size_t t = 0; t < nThreads; ++t) {
threads[t].join();
}
}
```
```{r}
timing <- tibble(
threads = c(1:4, seq.int(5, 50, l = 6)),
timings = map(
threads,
~ microbenchmark(
`std::thread` = stdEmpty(.),
`RcppThread::Thread` = RcppThreadEmpty(.),
times = 1000
)
)
) %>%
unnest()
```
```{r}
medians <- timing %>%
group_by(threads, expr) %>%
summarize(ms = median(time / 10^6)) %>%
ungroup()
```
```{r}
medians %>%
ggplot(aes(threads, ms, color = expr, linetype = expr)) +
geom_line(size = 0.8) +
theme_naglr(plot_title_face = "plain") +
scale_color_naglr() +
expand_limits(y = 0) +
labs(color = "", linetype = "") +
theme(legend.margin = margin(1, 1, 1, 1))
ggsave("benchEmptyThread.pdf", width = 5.5, height = 3)
```
```{r}
medians %>%
group_by(expr) %>%
arrange(threads) %>%
mutate(
mus = ms * 1000,
diff = (mus - lag(mus)) / (threads - lag(threads)),
oversubscribed = threads > parallel::detectCores()
) %>%
group_by(expr, oversubscribed) %>%
summarize(time_per_thread = mean(diff, na.rm = T)) %>%
spread(expr, time_per_thread) %>%
mutate(ratio = `RcppThread::Thread` / `std::thread`) %>%
knitr::kable(digits = 2)
```
## Checking for user interruptions
```{Rcpp}
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppThread)]]
#include <RcppThread.h>
// [[Rcpp::export]]
void interruptMain(int nChecks)
{
for (size_t i = 0; i < nChecks; ++i)
RcppThread::checkUserInterrupt();
}
// [[Rcpp::export]]
void interruptChild(int nChecks)
{
RcppThread::Thread thread([nChecks] {
for (size_t i = 0; i < nChecks; ++i)
RcppThread::checkUserInterrupt();
});
thread.join();
}
```
```{r}
timing <- tibble(
interruptions = seq.int(0, 10^4, l = 10),
timings = map(
interruptions,
~ microbenchmark(
`main thread` = interruptMain(.),
`child thread` = interruptChild(.),
times = 1000
)
)
) %>%
unnest()
```
```{r}
medians <- timing %>%
group_by(interruptions, expr) %>%
summarize(ms = median(time / 10^6))
```
```{r}
medians %>%
ggplot(aes(interruptions, ms, color = expr, linetype = expr)) +
geom_line(size = 0.8) +
theme_naglr() +
scale_color_naglr() +
expand_limits(y = 0) +
labs(linetype = "called from", color = "called from")
ggsave("benchInterrupt.pdf", width = 5.5, height = 3)
```
```{r}
medians %>%
group_by(expr) %>%
arrange(interruptions) %>%
mutate(diff = (ms - lag(ms)) / (interruptions - lag(interruptions))) %>%
summarize(ns = mean(diff, na.rm = TRUE) * 10^6) %>%
knitr::kable(digits = 0)
```
# Benchmarking against other libraries
## Empty jobs
```{Rcpp}
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::depends(RcppThread)]]
// [[Rcpp::depends(RcppParallel)]]
#include <Rcpp.h>
#include <omp.h>
#include <RcppThread.h>
#include <RcppParallel.h>
// [[Rcpp::export]]
void singleThreaded(int n)
{
for (size_t i = 0; i < n; ++i) ;
}
// [[Rcpp::export]]
void ThreadPool(int n)
{
RcppThread::ThreadPool pool;
for (size_t i = 0; i < n; ++i)
pool.push([] {});
pool.join();
}
// [[Rcpp::export]]
void parallelFor(int n)
{
RcppThread::parallelFor(0, n, [] (int i) {});
}
// [[Rcpp::export]]
void OpenMP(int n)
{
omp_set_num_threads(std::thread::hardware_concurrency());
#pragma omp parallel for
for (size_t i = 0; i < n; ++i) ;
}
struct EmptyJob : public RcppParallel::Worker
{
void operator()(std::size_t begin, std::size_t end) {
for (size_t i = begin; i < end; i++) ;
}
};
// [[Rcpp::export]]
void RcppParallelFor(int n)
{
EmptyJob job{};
RcppParallel::parallelFor(0, n, job);
}
```
```{r}
timing <- tibble(
jobs = seq.int(0, 10^4, l = 10),
timings = map(
jobs,
~ microbenchmark(
`single threaded` = singleThreaded(.),
ThreadPool = ThreadPool(.),
parallelFor = parallelFor(.),
OpenMP = OpenMP(.),
RcppParallel = RcppParallelFor(.),
times = 1000
)
)
) %>%
unnest()
```
```{r}
medians <- timing %>%
group_by(jobs, expr) %>%
summarize(ms = median(time / 10^6))
```
```{r}
medians %>%
ggplot(aes(jobs, ms, color = expr, linetype = expr)) +
geom_line(size = 0.8) +
theme_naglr() +
scale_color_naglr() +
expand_limits(y = 0) +
labs(linetype = "", color = "") +
scale_y_log10()
ggsave("benchEmpty.pdf", width = 6, height = 3)
```
## Kernel density estimates
```{Rcpp}
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::depends(RcppThread)]]
// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::depends(RcppEigen)]]
#include <Rcpp.h>
#include <omp.h>
#include <RcppThread.h>
#include <RcppParallel.h>
#include <Eigen/Dense>
using namespace Eigen;
VectorXd kernel(const VectorXd& x)
{
VectorXd k(x.size());
for (size_t i = 0; i < x.size(); ++i) {
if (std::abs(x(i)) > 1)
k(i) = 0;
else
k(i) = 0.75 * (1 - std::pow(x(i), 2));
}
return k;
}
VectorXd kde(const VectorXd& x)
{
double n = x.size();
double sd = std::sqrt((x.array() - x.mean()).square().sum() / (n - 1));
double bw = 1.06 * sd * std::pow(n, -0.2);
VectorXd grid = VectorXd::LinSpaced(500, -1, 1);
VectorXd fhat(grid.size());
for (size_t i = 0; i < grid.size(); ++i) {
fhat(i) = kernel((x.array() - grid(i)) / bw).mean() / bw;
}
return fhat;
}
// [[Rcpp::export]]
void singleThreaded(int n, int d)
{
MatrixXd x = MatrixXd(n, d).setRandom();
for (size_t i = 0; i < d; ++i)
kde(x.col(i));
}
// [[Rcpp::export]]
void ThreadPool(int n, int d)
{
MatrixXd x = MatrixXd(n, d).setRandom();
RcppThread::ThreadPool pool;
for (size_t i = 0; i < d; ++i)
pool.push([&, i] { kde(x.col(i)); });
pool.join();
}
// [[Rcpp::export]]
void parallelFor(int n, int d)
{
MatrixXd x = MatrixXd(n, d).setRandom();
RcppThread::parallelFor(0, d, [&] (size_t i) {
kde(x.col(i));
});
}
// [[Rcpp::export]]
void OpenMP(int n, int d)
{
MatrixXd x = MatrixXd(n, d).setRandom();
omp_set_num_threads(std::thread::hardware_concurrency());
#pragma omp parallel for
for (size_t i = 0; i < d; ++i) {
kde(x.col(i));
RcppThread::checkUserInterrupt();
}
}
struct KDEJob : public RcppParallel::Worker
{
KDEJob(const MatrixXd& x) : x_(x) {}
void operator()(size_t begin, size_t end) {
for (size_t i = begin; i < end; i++) {
RcppThread::checkUserInterrupt();
kde(x_.col(i));
}
}
const MatrixXd& x_;
};
// [[Rcpp::export]]
int RcppParallelFor(int n, int d)
{
MatrixXd x = MatrixXd(n, d).setRandom();
KDEJob job(x);
RcppParallel::parallelFor(0, d, job);
return 1;
}
```
```{r}
timing <- crossing(
n = c(10, 100, 500, 1000),
d = c(10, 100)
) %>%
mutate(
timings = map2(
n, d,
~ microbenchmark(
`single threaded` = singleThreaded(.x, .y),
ThreadPool = ThreadPool(.x, .y),
parallelFor = parallelFor(.x, .y),
OpenMP = OpenMP(.x, .y),
RcppParallel = RcppParallelFor(.x, .y),
times = 1000
)
)
) %>%
unnest()
```
```{r}
medians <- timing %>%
group_by(n, d, expr) %>%
summarize(ms = median(time / 10^6)) %>%
ungroup()
```
```{r}
medians %>%
mutate(d = str_c("d = ", d)) %>%
ggplot(aes(n, ms, color = expr, linetype = expr)) +
facet_wrap(~ d, scales = "free_y") +
geom_line(size = 0.8) +
theme_naglr() +
scale_color_naglr() +
expand_limits(y = 0) +
labs(linetype = "", color = "") +
xlab("sample size")
ggsave("benchKDE.pdf", width = 8, height = 3)
```
## Kendall correlation matrix
```{Rcpp}
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::depends(RcppThread)]]
// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::depends(wdm)]]
// [[Rcpp::depends(RcppEigen)]]
#include <wdm/eigen.hpp>
#include <omp.h>
#include <RcppThread.h>
#include <RcppParallel.h>
#include <RcppEigen.h>
using namespace Eigen;
void computeKendall(size_t i, size_t j, const MatrixXd& mat, MatrixXd& cor)
{
cor(i, j) = wdm::wdm(mat.col(i), mat.col(j), "kendall");
cor(j, i) = cor(i, j);
}
// [[Rcpp::export]]
void singleThreaded(int n, int d)
{
MatrixXd mat = MatrixXd(n, d).setRandom();
MatrixXd cor = MatrixXd::Identity(d, d);
for (size_t i = 0; i < d; ++i) {
for (size_t j = i + 1; j < d; ++j) {
computeKendall(i, j, mat, cor);
}
}
}
// [[Rcpp::export]]
void ThreadPool(int n, int d)
{
MatrixXd mat = MatrixXd(n, d).setRandom();
MatrixXd cor = MatrixXd::Identity(d, d);
RcppThread::ThreadPool pool;
for (size_t i = 0; i < d; ++i) {
for (size_t j = i + 1; j < d; ++j) {
pool.push([&, i, j] { computeKendall(i, j, mat, cor); });
}
}
pool.join();
}
// [[Rcpp::export]]
void parallelFor(int n, int d)
{
MatrixXd mat = MatrixXd(n, d).setRandom();
MatrixXd cor = MatrixXd::Identity(d, d);
RcppThread::parallelFor(0, d, [&] (size_t i) {
for (size_t j = i + 1; j < d; ++j) {
computeKendall(i, j, mat, cor);
}
});
}
// [[Rcpp::export]]
void OpenMP(int n, int d)
{
MatrixXd mat = MatrixXd(n, d).setRandom();
MatrixXd cor = MatrixXd::Identity(d, d);
omp_set_num_threads(std::thread::hardware_concurrency());
#pragma omp parallel for
for (size_t i = 0; i < d; ++i) {
for (size_t j = i + 1; j < d; ++j) {
computeKendall(i, j, mat, cor);
}
}
}
struct CorJob : public RcppParallel::Worker
{
CorJob(const MatrixXd& mat, MatrixXd& cor)
: mat_(mat), cor_(cor), d_(cor.cols()) {}
void operator()(size_t begin, size_t end) {
for (size_t i = begin; i < end; i++) {
for (size_t j = i + 1; j < d_; ++j) {
computeKendall(i, j, mat_, cor_);
}
}
}
const MatrixXd& mat_;
MatrixXd& cor_;
size_t d_;
};
// [[Rcpp::export]]
void RcppParallelFor(int n, int d)
{
MatrixXd mat = MatrixXd(n, d).setRandom();
MatrixXd cor = MatrixXd::Identity(d, d);
CorJob job(mat, cor);
RcppParallel::parallelFor(0, d, job);
}
```
```{r}
timing <- crossing(
n = c(10, 100, 500, 1000),
d = c(10, 100)
) %>%
mutate(
timings = map2(
n, d,
~ microbenchmark(
`single threaded` = singleThreaded(.x, .y),
ThreadPool = ThreadPool(.x, .y),
parallelFor = parallelFor(.x, .y),
OpenMP = OpenMP(.x, .y),
RcppParallelFor = RcppParallelFor(.x, .y),
times = 100
)
)
) %>%
unnest()
```
```{r}
medians <- timing %>%
group_by(n, d, expr) %>%
summarize(ms = median(time / 10^6)) %>%
ungroup()
```
```{r}
medians %>%
mutate(d = str_c("d = ", d)) %>%
ggplot(aes(n, ms, color = expr, linetype = expr)) +
facet_wrap(~ d, scales = "free_y") +
geom_line(size = 0.8) +
theme_naglr() +
scale_color_naglr() +
expand_limits(y = 0) +
labs(linetype = "", color = "") +
xlab("sample size")
ggsave("benchKendall.pdf", width = 8, height = 3)
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment