Skip to content

Instantly share code, notes, and snippets.

@traversc
Last active September 16, 2021 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save traversc/8b26b9e689b23d3a174aee296c0503ca to your computer and use it in GitHub Desktop.
Save traversc/8b26b9e689b23d3a174aee296c0503ca to your computer and use it in GitHub Desktop.
Rcpp Simple Progress
#include <atomic>
#include <thread>
#include <Rcpp.h>
#include <R_ext/Print.h>
using namespace Rcpp;
class simple_progress {
private:
const size_t max;
std::atomic<size_t> counter;
size_t current_ticks;
std::thread::id main_thread;
static constexpr double max_ticks = 51;
public:
simple_progress(const size_t max) : max(max), counter(0), current_ticks(0), main_thread(std::this_thread::get_id()) {
REprintf("|----|----|----|----|----|----|----|----|----|----|\n");
R_FlushConsole();
}
size_t increment(size_t n = 1) {
n = counter.fetch_add(n);
if(std::this_thread::get_id() == main_thread) {
print(n);
}
return n;
}
void print(size_t n) {
size_t new_ticks = static_cast<size_t>( static_cast<double>(n) / static_cast<double>(max) * max_ticks );
size_t tick_diff = new_ticks - current_ticks;
if(tick_diff > 0) {
current_ticks = new_ticks;
for(size_t i=0; i<tick_diff; ++i) REprintf("*");
}
R_FlushConsole();
}
// remove copy constructor/assignment and move constructor/assignment(implicit)
simple_progress & operator=(const simple_progress&) = delete;
simple_progress(const simple_progress&) = delete;
// destructor needs to print the final output
~simple_progress() {
print(counter.load());
REprintf("\n");
counter.~atomic();
}
};
#include <Rcpp.h>
#include <atomic>
#include <omp.h>
// [[Rcpp::plugins(openmp)]]
#include "simple_progress/simple_progress.h"
using namespace Rcpp;
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::export]]
double test_progress(int nthreads) {
size_t n = 100000000;
simple_progress prog(n);
std::atomic<uint64_t> output{0};
omp_set_num_threads(nthreads);
#pragma omp parallel for schedule(dynamic)
for(size_t i=0; i<n; ++i) {
output.fetch_add(i);
prog.increment();
}
return static_cast<double>(output.load());
}
/*** R
test_progress(4)
*/
#include <Rcpp.h>
#include <RcppParallel.h>
#include "simple_progress/simple_progress.h"
using namespace Rcpp;
using namespace RcppParallel;
// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::plugins(cpp11)]]
struct TestWorker : public Worker {
std::atomic<uint64_t> output;
simple_progress & prog;
TestWorker(uint64_t output, simple_progress & prog) : output(output), prog(prog) {}
void operator()(std::size_t begin, std::size_t end) {
for(size_t i=begin; i<end; ++i) {
output.fetch_add(i);
prog.increment();
}
}
};
// [[Rcpp::export]]
double test_progress(int nthreads) {
size_t n = 100000000;
simple_progress prog(n);
TestWorker w(0, prog);
parallelFor(0, n, w, 1, nthreads);
return static_cast<double>(w.output.load());
}
/*** R
test_progress(4)
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment