Skip to content

Instantly share code, notes, and snippets.

@kirkshoop
Forked from film42/rxcpp_pi.cpp
Last active August 29, 2015 14:04
Show Gist options
  • Save kirkshoop/1ad82caa9cccd736b2d4 to your computer and use it in GitHub Desktop.
Save kirkshoop/1ad82caa9cccd736b2d4 to your computer and use it in GitHub Desktop.
pi series - Rxcpp and for loop
auto pi = [](int k) {
return ( k % 2 == 0 ? -4 : 4 ) / ( long double )( ( 2 * k ) - 1 );
};
auto total = rxcpp::observable<>::range(1, 10000000)
.map(pi)
.sum()
.as_blocking().last();
std::cout << "Pi: " << total << std::endl;
auto pi = [](int k) {
return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
};
// share an output thread across all the producer threads
auto outputthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
// use all available hardware threads
auto total = rxcpp::observable<>::range(0, 19).
group_by(
[](int i) -> int {return i % std::thread::hardware_concurrency();},
[](int i){return i;}).
map(
[=](rxcpp::grouped_observable<int, int> onproc) {
// share a producer thread across all the ranges in this group
auto producerthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
return onproc.
map(
[=](int index){
static const int chunk = 1000000;
auto first = (chunk * index) + 1;
auto last = chunk * (index + 1);
return rxcpp::observable<>::range(first, last, producerthread).
map(pi).
sum(). // each thread maps and reduces its contribution to the answer
as_dynamic();
}).
concat(). // only subscribe to one range at a time in this group.
observe_on(outputthread).
as_dynamic();
}).
merge().
sum(). // reduces the contributions from all the threads to the answer
as_blocking().
last();
std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
Pi: 3.141592603589793
^- 8digit accuracy from 20000000 calls to pi
auto pi = [](int k) {
return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
};
// share an output thread across all the producer threads
auto outputthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
// use all available hardware threads
auto total = rxcpp::observable<>::range(0, std::thread::hardware_concurrency() - 1).
map(
[=](int index){
// partition equally across threads
static const int chunk = 20000000 / std::thread::hardware_concurrency();
auto first = (chunk * index) + 1;
auto last = chunk * (index + 1);
return rxcpp::observable<>::range(first, last, rxcpp::observe_on_new_thread()).
map(pi).
sum(). // each thread maps and reduces its contribution to the answer
as_dynamic();
}).
observe_on(outputthread).
merge().
sum(). // reduces the contributions from all the threads to the answer
as_blocking().
last();
std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
auto pi = [](int k) {
return ( k % 2 == 0 ? -4 : 4 ) / ( long double )( ( 2 * k ) - 1 );
};
long double total = 0;
for( int i = 1; i <= 1000000; ++i ) {
long double res = pi( i );
total += res;
}
std::cout << "Pi: " << total << std::endl;
@kirkshoop
Copy link
Author

the sum() shortcut should already exist, as_blocking().last() is a pending pull request - should be available shortly.
I changed the structure of the rx version to match the loop structure for easier comparison.

@film42
Copy link

film42 commented Aug 5, 2014

Seriously awesome work! Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment