Skip to content

Instantly share code, notes, and snippets.

@TheConstructor
Last active August 29, 2015 14:23
Show Gist options
  • Save TheConstructor/5a29ecba757b308b690d to your computer and use it in GitHub Desktop.
Save TheConstructor/5a29ecba757b308b690d to your computer and use it in GitHub Desktop.
sample NBody-Implementation using HPX
cmake_minimum_required(VERSION 3.0)
# this project is c++ based
project(exa_2015 CXX)
set(CMAKE_VERBOSE_MAKEFILE "ON")
# Instruct cmake to find and include the HPX settings and CMake-macros
find_package(HPX REQUIRED)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -ftemplate-backtrace-limit=0")
add_hpx_executable(nbody_test3
ESSENTIAL
SOURCES nbody_test3.cpp
COMPONENT_DEPENDENCIES iostreams vector
)
//
// Created by Matthias Vill on 11.06.15.
//
#ifndef EXA_2015_INTERACT_REDUCE_HPP
#define EXA_2015_INTERACT_REDUCE_HPP
// 1=Iterate over the future and apply reduction in numerical order locally
// 2=Create a queue with the initial value, then use wait_each on the futures
// 3=Create a queue with the initial value, call then on all intermediate futures
#define EXA_2015_INTERACT_REDUCE_MODE 3
#include <hpx/include/vector.hpp>
#include <hpx/include/lcos.hpp>
#include <hpx/include/parallel_algorithm.hpp>
#include <boost/function.hpp>
#include <boost/range/counting_range.hpp>
#include <hpx/include/iostreams.hpp>
namespace ess {
namespace interact_reduce {
using hpx::util::make_zip_iterator;
namespace detail {
#if EXA_2015_INTERACT_REDUCE_MODE > 1
template<typename reduce_action, typename Intermediate, typename Reduction>
struct reduce_helper {
private:
mutable hpx::lcos::queue<Reduction> m_queue;
friend class hpx::serialization::access;
// When the class Archive corresponds to an output archive, the
// & operator is defined similar to <<. Likewise, when the class Archive
// is a type of input archive the & operator is defined similar to >>.
template<class Archive>
void serialize(Archive &ar, const unsigned int version) {
ar &m_queue;
}
public:
reduce_helper(hpx::lcos::queue<Reduction> queue) : m_queue(queue) {
}
void operator()(hpx::future<Intermediate> next) const {
Intermediate nextValue = next.get();
reduce_action m_reduce;
Reduction val = m_reduce(hpx::find_here(), m_queue.get_value_sync(), nextValue);
m_queue.set_value(val);
}
};
#endif
template<typename interact_action, typename InputData, typename Intermediate>
struct interact_helper2 {
private:
const typename hpx::vector<InputData>::size_type m_left_idx;
const InputData m_left_data;
friend class hpx::serialization::access;
// When the class Archive corresponds to an output archive, the
// & operator is defined similar to <<. Likewise, when the class Archive
// is a type of input archive the & operator is defined similar to >>.
template<class Archive>
void serialize(Archive &ar, const unsigned int version) {
ar &m_left_idx;
ar &m_left_data;
}
public:
interact_helper2(const typename hpx::vector<InputData>::size_type left_idx,
const InputData left_data) : m_left_idx(left_idx), m_left_data(left_data) {
}
Intermediate operator()(hpx::future<InputData> right_data) const {
interact_action m_interact;
return m_interact(hpx::find_here(), m_left_data, right_data.get());
}
};
template<typename interact_action, typename init_reduce_action, typename reduce_action, typename InputData, typename Intermediate, typename Reduction>
struct interact_helper1 {
private:
const std::string m_input_name;
const std::string m_output_name;
friend class hpx::serialization::access;
// When the class Archive corresponds to an output archive, the
// & operator is defined similar to <<. Likewise, when the class Archive
// is a type of input archive the & operator is defined similar to >>.
template<class Archive>
void serialize(Archive &ar, const unsigned int version) {
ar &m_input_name;
ar &m_output_name;
}
public:
interact_helper1(const std::string &input_name, const std::string &output_name)
: m_input_name(input_name), m_output_name(output_name) {
}
void operator()(hpx::util::tuple<typename hpx::vector<InputData>::size_type, InputData> left) const {
hpx::vector<InputData> input = hpx::vector<InputData>();
hpx::vector<Reduction> output = hpx::vector<Reduction>();
hpx::future<void> input_future = input.connect_to(m_input_name);
hpx::future<void> output_future = output.connect_to(m_output_name);
interact_helper2<interact_action, InputData, Intermediate> helper2(hpx::util::get<0>(left), hpx::util::get<1>(left));
input_future.get();
#if EXA_2015_INTERACT_REDUCE_MODE == 3
std::vector<hpx::future<void>> futures = std::vector<hpx::future<void>>();
#else
std::vector<hpx::future<Intermediate>> futures = std::vector<hpx::future<Intermediate>>();
#endif
futures.reserve(input.size() - 1);
init_reduce_action m_initial;
Reduction reduction = m_initial(hpx::find_here(), hpx::util::get<1>(left));
#if EXA_2015_INTERACT_REDUCE_MODE > 1
hpx::lcos::queue<Reduction> queue = hpx::new_<hpx::lcos::queue<Reduction>>(hpx::find_here());
queue.set_value(reduction);
reduce_helper<reduce_action, Intermediate, Reduction> reduceHelper(queue);
#endif
for (typename hpx::vector<InputData>::size_type idx = 0; idx < input.size(); idx++) {
if (hpx::util::get<0>(left) != idx) {
#if EXA_2015_INTERACT_REDUCE_MODE == 3
futures.push_back(hpx::async(helper2, input.get_value(idx)).then(reduceHelper));
#else
futures.push_back(hpx::async(helper2, input.get_value(idx)));
#endif
}
}
#if EXA_2015_INTERACT_REDUCE_MODE == 3
hpx::lcos::wait_all(futures);
#elif EXA_2015_INTERACT_REDUCE_MODE == 2
hpx::lcos::wait_each(reduceHelper, futures);
#else
reduce_action m_reduce;
for (hpx::future<Intermediate> &future : futures) {
reduction = m_reduce(hpx::find_here(), reduction, future.get());
}
#endif
#if EXA_2015_INTERACT_REDUCE_MODE > 1
output_future.get();
output.set_value_sync(hpx::util::get<0>(left), queue.get_value_sync());
#else
output_future.get();
output.set_value_sync(hpx::util::get<0>(left), reduction);
#endif
}
};
}
template<typename interact_action, typename init_reduce_action, typename reduce_action, typename InputData=typename std::remove_reference<decltype(hpx::util::get<0>(typename interact_action::arguments_type()))>::type, typename Intermediate=typename interact_action::result_type, typename Reduction=typename reduce_action::result_type>
struct interact_reduce {
public:
interact_reduce() {
}
inline void apply(const std::string &input_name, const std::string &output_name) {
hpx::vector<InputData> input;
input.connect_to_sync(input_name);
auto range = boost::counting_range(static_cast<typename hpx::vector<InputData>::size_type>(0),
input.size());
detail::interact_helper1<interact_action, init_reduce_action, reduce_action, InputData, Intermediate, Reduction> helper1(input_name, output_name);
hpx::parallel::for_each(hpx::parallel::parallel_vector_execution_policy(),
make_zip_iterator(range.begin(), input.begin()),
make_zip_iterator(range.end(), input.end()),
helper1);
}
};
}
}
#endif //EXA_2015_INTERACT_REDUCE_HPP
//
// Created by Matthias Vill on 18.06.15.
//
#ifndef EXA_2015_NAME_GEN_HPP
#define EXA_2015_NAME_GEN_HPP
#include <iostream>
#include <random>
#include <functional> //for std::function
#include <algorithm> //for std::generate_n
#include <hpx/runtime/agas/interface.hpp>
namespace ess {
namespace util {
std::mt19937 init_mt19937() {
std::array<int, std::mt19937::state_size> seed_data;
std::random_device r;
std::generate_n(seed_data.data(), seed_data.size(), std::ref(r));
std::seed_seq seq(std::begin(seed_data), std::end(seed_data));
std::mt19937 eng(seq);
return eng;
}
std::mt19937 rng = init_mt19937();
// taken from http://stackoverflow.com/a/12468109/1266906
std::string random_string(size_t length) {
auto randchar = []() -> char {
const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
std::uniform_int_distribution<int> uid(0, max_index);
return charset[uid(rng)];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randchar);
return str;
}
std::string random_name(size_t length){
std::string name;
do {
name = random_string(length);
}while(hpx::agas::resolve_name_sync(name) != hpx::invalid_id);
return name;
}
}
}
#endif //EXA_2015_NAME_GEN_HPP
#include <iostream>
#include <hpx/hpx_init.hpp>
#include <hpx/include/bind.hpp>
#include <hpx/include/vector.hpp>
#include <hpx/include/actions.hpp>
#include <hpx/include/util.hpp>
#include <hpx/include/parallel_algorithm.hpp>
#include <hpx/include/parallel_numeric.hpp>
#include <hpx/include/iostreams.hpp>
#include <vector>
#include <limits>
#include <string>
#include <random>
#include <chrono>
#include <iostream>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include "name_gen.hpp"
#include "interact_reduce.hpp"
using namespace std;
using namespace std::chrono;
using data_t = float;
template<typename T>
struct point3 {
private:
friend class hpx::serialization::access;
// When the class Archive corresponds to an output archive, the
// & operator is defined similar to <<. Likewise, when the class Archive
// is a type of input archive the & operator is defined similar to >>.
template<class Archive>
void serialize(Archive &ar, const unsigned int version) {
ar &x;
ar &y;
ar &z;
}
public:
point3(T x = 0.f, T y = 0.f, T z = 0.f) : x(x), y(y), z(z) { }
T x;
T y;
T z;
};
template<typename T>
struct particle {
private:
friend class hpx::serialization::access;
// When the class Archive corresponds to an output archive, the
// & operator is defined similar to <<. Likewise, when the class Archive
// is a type of input archive the & operator is defined similar to >>.
template<class Archive>
void serialize(Archive &ar, const unsigned int version) {
ar &pos;
ar &vel;
ar &mass;
}
public:
point3<T> pos;
point3<T> vel;
T mass;
};
const data_t G = -6.673e-11f;
const data_t dt = 3600.f;
#define __sq(x) ((x)*(x))
#define __cu(x) ((x)*(x)*(x))
HPX_REGISTER_VECTOR(particle<data_t>, particle_data_t);
///////////////////////////////////////////////////////////////////////////////
typedef hpx::lcos::queue<particle<data_t>> queue_type;
typedef hpx::components::managed_component<
hpx::lcos::server::queue<particle<data_t>>
> queue_of_particles_type;
HPX_REGISTER_DERIVED_COMPONENT_FACTORY(
queue_of_particles_type, queue_of_particles_type,
"hpx::lcos::base_lco_with_value<particle<data_t>, particle<data_t>>");
point3<data_t> interact(particle<data_t> left, particle<data_t> right);
HPX_PLAIN_ACTION(interact);
particle<data_t> init_reduce(particle<data_t> previous);
HPX_PLAIN_ACTION(init_reduce);
particle<data_t> reduce(particle<data_t> reduction, point3<data_t> a);
HPX_PLAIN_ACTION(reduce);
///////////////////////////////////////////////////////////////////////////////
int main(int argc, char *argv[]) {
// Configure application-specific options
boost::program_options::options_description
desc_commandline("Usage: " HPX_APPLICATION_STRING " [options]");
desc_commandline.add_options()
("particleCount",
boost::program_options::value<size_t>()->default_value(1000),
"Particle count")
("runs",
boost::program_options::value<unsigned>()->default_value(3),
"Runs");
// Initialize and run HPX
return hpx::init(desc_commandline, argc, argv);
}
int hpx_main(boost::program_options::variables_map &vm) {
// extract command line argument, i.e. fib(N)
// boost::uint64_t n = vm["n-value"].as<boost::uint64_t>();
unsigned runs = vm["runs"].as<unsigned>();
size_t particleCount = vm["particleCount"].as<size_t>();
hpx::cout << "performing " << runs << " runs on " << particleCount << " particles" << endl;
hpx::vector<particle<data_t>> particles(particleCount);
auto init = [](auto &particles) {
mt19937 rng;
uniform_real_distribution<data_t> rnd_pos(-200e9f, 200e9f);
uniform_real_distribution<data_t> rnd_mass(1e22, 1e24);
rng.seed(13122012);
for (size_t i = 0; i < particles.size(); ++i) {
particle<data_t> p;
p.pos.x = rnd_pos(rng);
p.pos.y = rnd_pos(rng);
p.pos.z = rnd_pos(rng);
p.mass = rnd_mass(rng);
particles.set_value(i, p);
}
};
hpx::cout << "initialization..." << flush;
const string input_name = ess::util::random_name(42);
particles.register_as_sync(input_name);
init(particles);
hpx::cout << "done!" << endl;
auto nbody = [](hpx::vector<particle<data_t>> &particles, std::string input_name) {
const string output_name = ess::util::random_name(42);
hpx::vector<particle<data_t>> results(particles.size());
results.register_as_sync(output_name);
ess::interact_reduce::interact_reduce<interact_action, init_reduce_action, reduce_action> skeleton;
skeleton.apply(input_name, output_name);
return results;
};
vector<double> timings;
hpx::cout << "executing..." << endl;
for (unsigned i = 0; i < runs; i++) {
auto start = high_resolution_clock::now();
nbody(particles, input_name);
auto end = high_resolution_clock::now();
auto duration = end - start;
auto t = duration_cast<milliseconds>(duration).count();
hpx::cout << "run " << i << ": " << t << " ms" << endl;
timings.push_back(t);
}
hpx::cout << "...done!" << endl;
hpx::cout << "\n" << "timings:" << endl;
numeric_limits<double> limits;
double tmin = limits.max();
double tmax = limits.min();
double tavg = 0.0;
for (auto t : timings) {
tavg += t;
tmin = min(tmin, t);
tmax = max(tmax, t);
}
hpx::cout << "min: " << tmin << " ms - max: " << tmax << " ms - avg: " << tavg / runs << " ms" << endl;
return hpx::finalize(); // Handles HPX shutdown
}
particle<data_t> init_reduce(particle<data_t> previous) {
previous.pos.x += previous.vel.x * dt;
previous.pos.y += previous.vel.y * dt;
previous.pos.z += previous.vel.z * dt;
return previous;
}
particle<data_t> reduce(particle<data_t> reduction, point3<data_t> a) {
reduction.pos.x += a.x * 0.5f * __sq(dt);
reduction.pos.y += a.y * 0.5f * __sq(dt);
reduction.pos.z += a.z * 0.5f * __sq(dt);
reduction.vel.x += a.x * dt;
reduction.vel.y += a.y * dt;
reduction.vel.z += a.z * dt;
return reduction;
}
point3<data_t> interact(particle<data_t> left, particle<data_t> right) {
point3<data_t> a, r;
// calculate distance
r.x = left.pos.x - right.pos.x;
r.y = left.pos.y - right.pos.y;
r.z = left.pos.z - right.pos.z;
data_t dist = __sq(r.x) + __sq(r.y) + __sq(r.z);
if (dist > 0.0f) {
// calculate force
data_t dr = 1.0f / sqrt(dist);
data_t force = G * right.mass * __cu(dr);
a.x += force * r.x;
a.y += force * r.y;
a.z += force * r.z;
}
return a;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment