Skip to content

Instantly share code, notes, and snippets.

@biddisco
Created January 29, 2018 09:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save biddisco/9bfed7469a7ee13be228775165b463ee to your computer and use it in GitHub Desktop.
Save biddisco/9bfed7469a7ee13be228775165b463ee to your computer and use it in GitHub Desktop.
#ifndef CHOLESKY_HPX_MPI_H
#define CHOLESKY_HPX_MPI_H
#include "matrixdist.h"
#include "matrix.h"
#include "blas_lapack.h"
#include "util.h"
//
#include "hpx/hpx.hpp"
#include "hpx_profiler.h"
//
#include <plugins/parcelport/parcelport_logging.hpp>
#include <hpx/lcos/local/detail/sliding_semaphore.hpp>
#include <hpx/runtime/threads/executors/guided_pool_executor.hpp>
//#define GUIDED_POOL_EXEC
#define GUIDED_POOL_SHIM
#ifdef RING_BCAST
#define colBcast colRingBcast
#define rowBcast rowRingBcast
#endif
static bool use_pools = false;
static bool use_scheduler = false;
static int mpi_threads = 1;
static int hp_queues = 1;
static bool use_guided_executor = false;
static int allocator_mode = 0;
static int num_queues = 1;
// ------------------------------------------------------------------------
// specialize the numa hint template a simple single matrix input task
namespace hpx { namespace threads { namespace executors
{
struct cholesky_tag {};
template <typename MatrixType>
struct HPX_EXPORT pool_numa_hint<MatrixType, cholesky_tag> {
// @TODO : remove the bitmap alloc and use a preallocated onefrom TLS
//
// The call operator () must return an int type
// The arguments must be const ref versions of the equivalent task arguments
// harware check : int dom = threads::get_topology().get_numa_domain(address);
int operator()(const MatrixType& matrix) const {
int dom = matrix.get_numa();
// std::cout << "Numa hint 1 returning " << dom <<"\n";
return dom;
}
int operator()(const MatrixType& matrix1, const MatrixType& matrix2) const {
int dom = matrix2.get_numa();
// std::cout << "Numa hint 2 returning " << dom <<"\n";
return dom;
}
int operator()(const MatrixType& matrix1, const MatrixType& matrix2,
const MatrixType& matrix3) const
{
int dom = matrix3.get_numa();
// std::cout << "Numa hint 3 returning " << dom <<"\n";
return dom;
}
};
}}}
using namespace hpx::threads::executors;
// ------------------------------------------------------------------------
template <typename T, typename C2D, typename Allocator=std::allocator<T>>
void cholesky(MatrixDist<T, C2D, Allocator>& a)
{
using CommType = const C2D*;
// setup guided executors for different task priorities on the default (matrix) pool
using matrix_hint_type = pool_numa_hint<Matrix<T>, cholesky_tag>;
#ifdef GUIDED_POOL_SHIM
guided_pool_executor_shim<matrix_hint_type> *matrix_HP;
guided_pool_executor_shim<matrix_hint_type> *matrix_normal;
if (use_guided_executor) {
matrix_HP = new guided_pool_executor_shim<matrix_hint_type>(true, "default",
hpx::threads::thread_priority_high);
matrix_normal = new guided_pool_executor_shim<matrix_hint_type>(true, "default",
hpx::threads::thread_priority_default);
}
else {
// setup executors for different task priorities on the default (matrix) pool
matrix_HP = new guided_pool_executor_shim<matrix_hint_type>(false, "default",
hpx::threads::thread_priority_high);
matrix_normal = new guided_pool_executor_shim<matrix_hint_type>(false, "default",
hpx::threads::thread_priority_default);
}
#else
pool_executor *matrix_HP;
pool_executor *matrix_normal;
matrix_HP = new pool_executor("default",
hpx::threads::thread_priority_high);
matrix_normal = new pool_executor("default",
hpx::threads::thread_priority_default);
#endif
// create an executor on the mpi pool
hpx::threads::scheduled_executor mpi_executor;
auto m_size = a.comm().size();
if (use_pools && (std::get<0>(m_size) * std::get<1>(m_size)) > 1) {
// setup mpi executor
mpi_executor = hpx::threads::executors::pool_executor("mpi");
std::cout << "\n[hpx_main] got mpi executor " << std::endl;
}
else {
// this should not be used, but is provided for safe fallback operation
mpi_executor = hpx::threads::executors::pool_executor("default");
}
util::check_square(a, "a");
util::check_square_block_dist(a, "a");
int n = a.m();
int nb = a.nb();
auto proc_grid_size = a.comm().size();
hpx::future<CommType> comm2D_ft = hpx::make_ready_future(&a.comm());
std::map<std::size_t, hpx::future<Matrix<T>>> block_ft;
size_t ld_block_ft = n;
for (int j = 0; j < n; j += nb) {
int jb = std::min(nb, n - j); // panel width
for (int i = 0; i < n; i += nb) {
int ib = std::min(nb, n - i); // panel height
auto index_l = a.localIndex(i, j);
if (index_l.isMine()) {
int sub_col = (a.ptr(index_l) - a.ptr()) / a.localMatrix().get_allocator().matrix_.leading_dim;
int sub_row = (a.ptr(index_l) - a.ptr()) % a.localMatrix().get_allocator().matrix_.leading_dim;
block_ft.emplace(i + ld_block_ft * j,
hpx::make_ready_future(Matrix<T>(ib, jb, a.ptr(index_l), a.ld(),
// @TODO - clean this up
std::bind(&Allocator::get_domain,
sub_col, sub_row,
a.localMatrix().get_allocator().matrix_.tile_dim,
a.localMatrix().get_allocator().matrix_.tiles_per_domain,
a.localMatrix().get_allocator().matrix_.num_domains,
4096)
)));
}
}
}
auto block_ft_acc = [&block_ft, ld_block_ft ](int i, int j) -> auto& {
return block_ft[i + ld_block_ft * j];
};
// To limit the depth of dependency tree we allow N iterations to unroll at a time
const int tasks = (hpx::get_num_worker_threads()*500)/((n/nb)*(n/nb));
const int semaphore_limit = std::max(2, tasks);
std::cout << "Semaphore limit set to " << semaphore_limit
<< " assuming tasks " << tasks << std::endl;
hpx::lcos::local::sliding_semaphore sem(semaphore_limit*nb);
bool single_node = proc_grid_size.second<2;
for (int k = 0; k < n; k += nb) {
int kb = std::min(nb, n - k); // panel width
auto index_diag_l = a.localIndex(k, k);
hpx::shared_future<Matrix<T>> diag_block_sf;
std::map<int, hpx::shared_future<Matrix<T>>> panel_sf;
// not all ranks participate in all iterations, so don't use the semaphore
// if we are not avtive - keep track of latest activation of the semaphore
bool semaphore_used = false;
// If the diagonal block is on this node factorize it.
if (index_diag_l.isMine()) {
auto taskname = profiler::createNameHP("HP Fact. Diag Block", k / nb);
diag_block_sf = block_ft_acc(k, k).then(
*matrix_HP,
[k, taskname, &sem, single_node](hpx::future<Matrix<T>>&& diag_block_ft)
{
profiler::TaskProfiler tp(taskname, "Panel");
int info = 0;
auto diag_block = diag_block_ft.get();
//
potrf("L", diag_block.m(), diag_block.ptr(), diag_block.ld(), &info);
if (info != 0) {
util::abort("xPOTRF of a(" + std::to_string(k) + ", " + std::to_string(k) +
") returned info = " + std::to_string(info));
}
if (single_node) {
sem.signal(k);
}
return diag_block;
});
if (k + kb == n)
diag_block_sf.get(); // This is the last task: synchronize with it.
}
if (k < n - nb) {
// If this node belong to the column of the panel broadcast the diagonal block and update the
// panel.
if (index_diag_l.indexCol().isMine()) {
// Broadcast the diagonal block columnwise.
int root = index_diag_l.indexRow().owner();
if (proc_grid_size.first > 1) {
auto taskname = profiler::createNameHP("HP Bcast Col Diag Block", k / nb);
if (index_diag_l.indexRow().isMine()) {
// Bcast Root.
comm2D_ft = hpx::dataflow(
mpi_executor,
[root, taskname](hpx::future<CommType> &&comm2D_ft,
hpx::shared_future<Matrix<T>> &&diag_block_sf)
{
profiler::TaskProfiler tp(taskname, "Comm");
const auto& diag_block = diag_block_sf.get();
auto comm2D = comm2D_ft.get();
int kb = diag_block.m();
comm2D->colBcast(Message(kb, kb, diag_block.ptr(), diag_block.ld()), root);
return comm2D;
},
comm2D_ft, diag_block_sf);
}
else {
// Others.
std::tie(comm2D_ft, diag_block_sf) = hpx::split_future(
comm2D_ft.then(
mpi_executor,
[kb, root, taskname](hpx::future<CommType> &&comm2D_ft)
{
profiler::TaskProfiler tp(taskname, "Comm");
Matrix<T> block(kb, kb);
auto comm2D = comm2D_ft.get();
comm2D->colBcast(Message(kb, kb, block.ptr(), block.ld()), root);
return std::make_pair(comm2D, std::move(block));
}));
}
}
// Update the panel.
for (int i = k + nb; i < n; i += nb) {
auto index_block_l = a.localIndex(i, k);
if (index_block_l.indexRow().isMine()) {
auto taskname = profiler::createNameHP("HP Update Panel", k / nb, i / nb);
panel_sf[i] = hpx::dataflow(
*matrix_HP,
[nb,taskname](hpx::shared_future<Matrix<T>> &&diag_block_sf,
hpx::future<Matrix<T>> &&panel_ft)
{
profiler::TaskProfiler tp(taskname, "Panel");
auto panel = panel_ft.get();
const auto& diag_block = diag_block_sf.get();
trsm("R", "L", "C", "N", panel.m(), panel.n(), 1., diag_block.ptr(),
diag_block.ld(), panel.ptr(), panel.ld());
return panel;
},
diag_block_sf, block_ft_acc(i, k));
}
}
}
// Broadcast the panel rowwise.
// This is only called when we a running with >1 ranks
if (proc_grid_size.second > 1) {
int root = index_diag_l.indexCol().owner();
for (int i = k + nb; i < n; i += nb) {
auto index_block_l = a.localIndex(i, k);
if (index_block_l.indexRow().isMine()) {
auto taskname = profiler::createNameHP("HP Bcast Row Panel", k / nb, i / nb);
if (index_diag_l.indexCol().isMine()) {
// Bcast root.
comm2D_ft = hpx::dataflow(
mpi_executor,
[root, taskname, &sem, k](hpx::future<CommType> &&comm2D_ft,
hpx::shared_future<Matrix<T>> &&panel_sf)
{
profiler::TaskProfiler tp(taskname, "Comm");
const auto& panel = panel_sf.get();
auto comm2D = comm2D_ft.get();
comm2D->rowBcast(Message(panel.m(), panel.n(), panel.ptr(), panel.ld()), root);
//std::cout << "Rank " << comm2D->myId().rank() << " Signalling for k " << k << std::endl;
sem.signal(k);
return comm2D;
},
comm2D_ft, panel_sf[i]);
}
else {
// Others.
int ib = std::min(nb, n - i);
std::tie(comm2D_ft, panel_sf[i]) = hpx::split_future(comm2D_ft.then(
mpi_executor,
[ib, kb, root, taskname, &sem, k](hpx::future<CommType> &&comm2D_ft)
{
profiler::TaskProfiler tp(taskname, "Comm");
Matrix<T> panel(ib, kb);
auto comm2D = comm2D_ft.get();
comm2D->rowBcast(Message(panel.m(), panel.n(), panel.ptr(), panel.ld()), root);
//std::cout << "Rank " << comm2D->myId().rank() << " Signalling for k " << k << std::endl;
sem.signal(k);
return std::make_pair(comm2D, std::move(panel));
}));
}
semaphore_used = true;
}
}
// if this rank didn't participate in the row broadcast,
// then just signal so that we don't cause a deadlock
if (!semaphore_used) {
//std::cout << "Rank " << a.comm().myId().rank() << " Signalling fake k " << k << std::endl;
sem.signal(k);
}
}
// Loop over block columns of the trailing matrix.
for (int j = k + kb; j < n; j += nb) {
auto index_j_l = a.localIndex(j, j);
if (index_j_l.indexCol().isMine()) {
int jb = std::min(nb, n - j);
// Set up the pointer and leading dimension of one block of the transposed panel (receive
// it in a workspace).
hpx::shared_future<Matrix<T>> panel_block_sf;
// Broadcast the panel block columnwise.
int root = index_j_l.indexRow().owner();
auto taskname = profiler::createNameHP("HP Bcast Col Panel", k / nb, j / nb);
if (index_j_l.indexRow().isMine()) {
panel_block_sf = panel_sf[j];
// Bcast root.
if (proc_grid_size.first > 1) {
comm2D_ft = hpx::dataflow( //
mpi_executor,
[root, taskname](hpx::future<CommType> &&comm2D_ft,
hpx::shared_future<Matrix<T>> &&tr_panel_block_sf)
{
profiler::TaskProfiler tp(taskname, "Comm");
const auto& tr_panel_block = tr_panel_block_sf.get();
auto comm2D = comm2D_ft.get();
comm2D->colBcast(Message(tr_panel_block.m(), tr_panel_block.n(),
tr_panel_block.ptr(), tr_panel_block.ld()),
root);
return comm2D;
},
comm2D_ft, panel_block_sf);
}
}
else {
// Others.
std::tie(comm2D_ft, panel_block_sf) = hpx::split_future(comm2D_ft.then(
mpi_executor,
[jb, kb, root, taskname](hpx::future<CommType> &&comm2D_ft)
{
profiler::TaskProfiler tp(taskname, "Comm");
Matrix<T> tr_panel_block(jb, kb);
auto comm2D = comm2D_ft.get();
comm2D->colBcast(Message(tr_panel_block.m(), tr_panel_block.n(),
tr_panel_block.ptr(), tr_panel_block.ld()),
root);
return std::make_pair(comm2D, std::move(tr_panel_block));
}));
}
// First panel of the trailing matrix need higher priority.
auto policy_trailing_matrix_panel = matrix_normal;
std::string name_prefix;
if (j == k + kb) {
policy_trailing_matrix_panel = matrix_HP;
name_prefix = "HP ";
}
else {
policy_trailing_matrix_panel = matrix_normal;
name_prefix = "";
}
// Update a column of the trailing matrix.
if (index_j_l.indexRow().isMine()) {
auto taskname = profiler::createName(name_prefix + "Update Diag Trailing Matrix",
k / nb, j / nb, j / nb);
block_ft_acc(j, j) = hpx::dataflow(
*policy_trailing_matrix_panel,
[taskname](hpx::shared_future<Matrix<T>> &&panel_block_sf,
hpx::future<Matrix<T>> &&diag_block_ft)
{
profiler::TaskProfiler tp(taskname, "Trailing Matrix");
const auto& panel_block = panel_block_sf.get();
auto diag_block = diag_block_ft.get();
herk("L", "N", panel_block.m(), panel_block.n(), -1., panel_block.ptr(),
panel_block.ld(), 1., diag_block.ptr(), diag_block.ld());
return diag_block;
},
panel_block_sf, block_ft_acc(j, j));
}
for (int i = j + nb; i < n; i += nb) {
auto index_i_l = a.localIndex(i, j);
if (index_i_l.indexRow().isMine()) {
auto taskname = profiler::createName(name_prefix + "Update Trailing Matrix", k / nb,
j / nb, i / nb);
block_ft_acc(i, j) = hpx::dataflow(
*policy_trailing_matrix_panel,
[taskname](hpx::shared_future<Matrix<T>> &&panel_sf,
hpx::shared_future<Matrix<T>> &&panel_block_sf,
hpx::future<Matrix<T>> &&matrix_panel_ft)
{
profiler::TaskProfiler tp(taskname, "Trailing Matrix");
const auto& panel = panel_sf.get();
const auto& panel_block = panel_block_sf.get();
auto matrix_panel = matrix_panel_ft.get();
gemm("N", "C", panel.m(), panel_block.m(), panel_block.n(), -1., panel.ptr(),
panel.ld(), panel_block.ptr(), panel_block.ld(), 1., matrix_panel.ptr(),
matrix_panel.ld());
return matrix_panel;
},
panel_sf[i], panel_block_sf, block_ft_acc(i, j));
}
}
}
}
}
// std::cout << "Signalling semaphore with k=" << k <<std::endl;
sem.wait(k);
//std::cout << "Gone past wait at k=" << k << std::endl;
}
delete matrix_HP;
delete matrix_normal;
// The execution of all MPI communication and of the last diagonal block (.get() above)
// means that all the tasks in all the nodes have executed.
// Ensure that all MPI communication happened:
comm2D_ft.get();
}
#ifdef RING_BCAST
#undef colBcast
#undef rowBcast
#endif
#endif // CHOLESKY_HPX_MPI_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment