-
-
Save biddisco/9bfed7469a7ee13be228775165b463ee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef CHOLESKY_HPX_MPI_H | |
#define CHOLESKY_HPX_MPI_H | |
#include "matrixdist.h" | |
#include "matrix.h" | |
#include "blas_lapack.h" | |
#include "util.h" | |
// | |
#include "hpx/hpx.hpp" | |
#include "hpx_profiler.h" | |
// | |
#include <plugins/parcelport/parcelport_logging.hpp> | |
#include <hpx/lcos/local/detail/sliding_semaphore.hpp> | |
#include <hpx/runtime/threads/executors/guided_pool_executor.hpp> | |
//#define GUIDED_POOL_EXEC | |
#define GUIDED_POOL_SHIM | |
#ifdef RING_BCAST | |
#define colBcast colRingBcast | |
#define rowBcast rowRingBcast | |
#endif | |
static bool use_pools = false; | |
static bool use_scheduler = false; | |
static int mpi_threads = 1; | |
static int hp_queues = 1; | |
static bool use_guided_executor = false; | |
static int allocator_mode = 0; | |
static int num_queues = 1; | |
// ------------------------------------------------------------------------ | |
// specialize the numa hint template a simple single matrix input task | |
namespace hpx { namespace threads { namespace executors | |
{ | |
struct cholesky_tag {}; | |
template <typename MatrixType> | |
struct HPX_EXPORT pool_numa_hint<MatrixType, cholesky_tag> { | |
// @TODO : remove the bitmap alloc and use a preallocated onefrom TLS | |
// | |
// The call operator () must return an int type | |
// The arguments must be const ref versions of the equivalent task arguments | |
// harware check : int dom = threads::get_topology().get_numa_domain(address); | |
int operator()(const MatrixType& matrix) const { | |
int dom = matrix.get_numa(); | |
// std::cout << "Numa hint 1 returning " << dom <<"\n"; | |
return dom; | |
} | |
int operator()(const MatrixType& matrix1, const MatrixType& matrix2) const { | |
int dom = matrix2.get_numa(); | |
// std::cout << "Numa hint 2 returning " << dom <<"\n"; | |
return dom; | |
} | |
int operator()(const MatrixType& matrix1, const MatrixType& matrix2, | |
const MatrixType& matrix3) const | |
{ | |
int dom = matrix3.get_numa(); | |
// std::cout << "Numa hint 3 returning " << dom <<"\n"; | |
return dom; | |
} | |
}; | |
}}} | |
using namespace hpx::threads::executors; | |
// ------------------------------------------------------------------------ | |
template <typename T, typename C2D, typename Allocator=std::allocator<T>> | |
void cholesky(MatrixDist<T, C2D, Allocator>& a) | |
{ | |
using CommType = const C2D*; | |
// setup guided executors for different task priorities on the default (matrix) pool | |
using matrix_hint_type = pool_numa_hint<Matrix<T>, cholesky_tag>; | |
#ifdef GUIDED_POOL_SHIM | |
guided_pool_executor_shim<matrix_hint_type> *matrix_HP; | |
guided_pool_executor_shim<matrix_hint_type> *matrix_normal; | |
if (use_guided_executor) { | |
matrix_HP = new guided_pool_executor_shim<matrix_hint_type>(true, "default", | |
hpx::threads::thread_priority_high); | |
matrix_normal = new guided_pool_executor_shim<matrix_hint_type>(true, "default", | |
hpx::threads::thread_priority_default); | |
} | |
else { | |
// setup executors for different task priorities on the default (matrix) pool | |
matrix_HP = new guided_pool_executor_shim<matrix_hint_type>(false, "default", | |
hpx::threads::thread_priority_high); | |
matrix_normal = new guided_pool_executor_shim<matrix_hint_type>(false, "default", | |
hpx::threads::thread_priority_default); | |
} | |
#else | |
pool_executor *matrix_HP; | |
pool_executor *matrix_normal; | |
matrix_HP = new pool_executor("default", | |
hpx::threads::thread_priority_high); | |
matrix_normal = new pool_executor("default", | |
hpx::threads::thread_priority_default); | |
#endif | |
// create an executor on the mpi pool | |
hpx::threads::scheduled_executor mpi_executor; | |
auto m_size = a.comm().size(); | |
if (use_pools && (std::get<0>(m_size) * std::get<1>(m_size)) > 1) { | |
// setup mpi executor | |
mpi_executor = hpx::threads::executors::pool_executor("mpi"); | |
std::cout << "\n[hpx_main] got mpi executor " << std::endl; | |
} | |
else { | |
// this should not be used, but is provided for safe fallback operation | |
mpi_executor = hpx::threads::executors::pool_executor("default"); | |
} | |
util::check_square(a, "a"); | |
util::check_square_block_dist(a, "a"); | |
int n = a.m(); | |
int nb = a.nb(); | |
auto proc_grid_size = a.comm().size(); | |
hpx::future<CommType> comm2D_ft = hpx::make_ready_future(&a.comm()); | |
std::map<std::size_t, hpx::future<Matrix<T>>> block_ft; | |
size_t ld_block_ft = n; | |
for (int j = 0; j < n; j += nb) { | |
int jb = std::min(nb, n - j); // panel width | |
for (int i = 0; i < n; i += nb) { | |
int ib = std::min(nb, n - i); // panel height | |
auto index_l = a.localIndex(i, j); | |
if (index_l.isMine()) { | |
int sub_col = (a.ptr(index_l) - a.ptr()) / a.localMatrix().get_allocator().matrix_.leading_dim; | |
int sub_row = (a.ptr(index_l) - a.ptr()) % a.localMatrix().get_allocator().matrix_.leading_dim; | |
block_ft.emplace(i + ld_block_ft * j, | |
hpx::make_ready_future(Matrix<T>(ib, jb, a.ptr(index_l), a.ld(), | |
// @TODO - clean this up | |
std::bind(&Allocator::get_domain, | |
sub_col, sub_row, | |
a.localMatrix().get_allocator().matrix_.tile_dim, | |
a.localMatrix().get_allocator().matrix_.tiles_per_domain, | |
a.localMatrix().get_allocator().matrix_.num_domains, | |
4096) | |
))); | |
} | |
} | |
} | |
auto block_ft_acc = [&block_ft, ld_block_ft ](int i, int j) -> auto& { | |
return block_ft[i + ld_block_ft * j]; | |
}; | |
// To limit the depth of dependency tree we allow N iterations to unroll at a time | |
const int tasks = (hpx::get_num_worker_threads()*500)/((n/nb)*(n/nb)); | |
const int semaphore_limit = std::max(2, tasks); | |
std::cout << "Semaphore limit set to " << semaphore_limit | |
<< " assuming tasks " << tasks << std::endl; | |
hpx::lcos::local::sliding_semaphore sem(semaphore_limit*nb); | |
bool single_node = proc_grid_size.second<2; | |
for (int k = 0; k < n; k += nb) { | |
int kb = std::min(nb, n - k); // panel width | |
auto index_diag_l = a.localIndex(k, k); | |
hpx::shared_future<Matrix<T>> diag_block_sf; | |
std::map<int, hpx::shared_future<Matrix<T>>> panel_sf; | |
// not all ranks participate in all iterations, so don't use the semaphore | |
// if we are not avtive - keep track of latest activation of the semaphore | |
bool semaphore_used = false; | |
// If the diagonal block is on this node factorize it. | |
if (index_diag_l.isMine()) { | |
auto taskname = profiler::createNameHP("HP Fact. Diag Block", k / nb); | |
diag_block_sf = block_ft_acc(k, k).then( | |
*matrix_HP, | |
[k, taskname, &sem, single_node](hpx::future<Matrix<T>>&& diag_block_ft) | |
{ | |
profiler::TaskProfiler tp(taskname, "Panel"); | |
int info = 0; | |
auto diag_block = diag_block_ft.get(); | |
// | |
potrf("L", diag_block.m(), diag_block.ptr(), diag_block.ld(), &info); | |
if (info != 0) { | |
util::abort("xPOTRF of a(" + std::to_string(k) + ", " + std::to_string(k) + | |
") returned info = " + std::to_string(info)); | |
} | |
if (single_node) { | |
sem.signal(k); | |
} | |
return diag_block; | |
}); | |
if (k + kb == n) | |
diag_block_sf.get(); // This is the last task: synchronize with it. | |
} | |
if (k < n - nb) { | |
// If this node belong to the column of the panel broadcast the diagonal block and update the | |
// panel. | |
if (index_diag_l.indexCol().isMine()) { | |
// Broadcast the diagonal block columnwise. | |
int root = index_diag_l.indexRow().owner(); | |
if (proc_grid_size.first > 1) { | |
auto taskname = profiler::createNameHP("HP Bcast Col Diag Block", k / nb); | |
if (index_diag_l.indexRow().isMine()) { | |
// Bcast Root. | |
comm2D_ft = hpx::dataflow( | |
mpi_executor, | |
[root, taskname](hpx::future<CommType> &&comm2D_ft, | |
hpx::shared_future<Matrix<T>> &&diag_block_sf) | |
{ | |
profiler::TaskProfiler tp(taskname, "Comm"); | |
const auto& diag_block = diag_block_sf.get(); | |
auto comm2D = comm2D_ft.get(); | |
int kb = diag_block.m(); | |
comm2D->colBcast(Message(kb, kb, diag_block.ptr(), diag_block.ld()), root); | |
return comm2D; | |
}, | |
comm2D_ft, diag_block_sf); | |
} | |
else { | |
// Others. | |
std::tie(comm2D_ft, diag_block_sf) = hpx::split_future( | |
comm2D_ft.then( | |
mpi_executor, | |
[kb, root, taskname](hpx::future<CommType> &&comm2D_ft) | |
{ | |
profiler::TaskProfiler tp(taskname, "Comm"); | |
Matrix<T> block(kb, kb); | |
auto comm2D = comm2D_ft.get(); | |
comm2D->colBcast(Message(kb, kb, block.ptr(), block.ld()), root); | |
return std::make_pair(comm2D, std::move(block)); | |
})); | |
} | |
} | |
// Update the panel. | |
for (int i = k + nb; i < n; i += nb) { | |
auto index_block_l = a.localIndex(i, k); | |
if (index_block_l.indexRow().isMine()) { | |
auto taskname = profiler::createNameHP("HP Update Panel", k / nb, i / nb); | |
panel_sf[i] = hpx::dataflow( | |
*matrix_HP, | |
[nb,taskname](hpx::shared_future<Matrix<T>> &&diag_block_sf, | |
hpx::future<Matrix<T>> &&panel_ft) | |
{ | |
profiler::TaskProfiler tp(taskname, "Panel"); | |
auto panel = panel_ft.get(); | |
const auto& diag_block = diag_block_sf.get(); | |
trsm("R", "L", "C", "N", panel.m(), panel.n(), 1., diag_block.ptr(), | |
diag_block.ld(), panel.ptr(), panel.ld()); | |
return panel; | |
}, | |
diag_block_sf, block_ft_acc(i, k)); | |
} | |
} | |
} | |
// Broadcast the panel rowwise. | |
// This is only called when we a running with >1 ranks | |
if (proc_grid_size.second > 1) { | |
int root = index_diag_l.indexCol().owner(); | |
for (int i = k + nb; i < n; i += nb) { | |
auto index_block_l = a.localIndex(i, k); | |
if (index_block_l.indexRow().isMine()) { | |
auto taskname = profiler::createNameHP("HP Bcast Row Panel", k / nb, i / nb); | |
if (index_diag_l.indexCol().isMine()) { | |
// Bcast root. | |
comm2D_ft = hpx::dataflow( | |
mpi_executor, | |
[root, taskname, &sem, k](hpx::future<CommType> &&comm2D_ft, | |
hpx::shared_future<Matrix<T>> &&panel_sf) | |
{ | |
profiler::TaskProfiler tp(taskname, "Comm"); | |
const auto& panel = panel_sf.get(); | |
auto comm2D = comm2D_ft.get(); | |
comm2D->rowBcast(Message(panel.m(), panel.n(), panel.ptr(), panel.ld()), root); | |
//std::cout << "Rank " << comm2D->myId().rank() << " Signalling for k " << k << std::endl; | |
sem.signal(k); | |
return comm2D; | |
}, | |
comm2D_ft, panel_sf[i]); | |
} | |
else { | |
// Others. | |
int ib = std::min(nb, n - i); | |
std::tie(comm2D_ft, panel_sf[i]) = hpx::split_future(comm2D_ft.then( | |
mpi_executor, | |
[ib, kb, root, taskname, &sem, k](hpx::future<CommType> &&comm2D_ft) | |
{ | |
profiler::TaskProfiler tp(taskname, "Comm"); | |
Matrix<T> panel(ib, kb); | |
auto comm2D = comm2D_ft.get(); | |
comm2D->rowBcast(Message(panel.m(), panel.n(), panel.ptr(), panel.ld()), root); | |
//std::cout << "Rank " << comm2D->myId().rank() << " Signalling for k " << k << std::endl; | |
sem.signal(k); | |
return std::make_pair(comm2D, std::move(panel)); | |
})); | |
} | |
semaphore_used = true; | |
} | |
} | |
// if this rank didn't participate in the row broadcast, | |
// then just signal so that we don't cause a deadlock | |
if (!semaphore_used) { | |
//std::cout << "Rank " << a.comm().myId().rank() << " Signalling fake k " << k << std::endl; | |
sem.signal(k); | |
} | |
} | |
// Loop over block columns of the trailing matrix. | |
for (int j = k + kb; j < n; j += nb) { | |
auto index_j_l = a.localIndex(j, j); | |
if (index_j_l.indexCol().isMine()) { | |
int jb = std::min(nb, n - j); | |
// Set up the pointer and leading dimension of one block of the transposed panel (receive | |
// it in a workspace). | |
hpx::shared_future<Matrix<T>> panel_block_sf; | |
// Broadcast the panel block columnwise. | |
int root = index_j_l.indexRow().owner(); | |
auto taskname = profiler::createNameHP("HP Bcast Col Panel", k / nb, j / nb); | |
if (index_j_l.indexRow().isMine()) { | |
panel_block_sf = panel_sf[j]; | |
// Bcast root. | |
if (proc_grid_size.first > 1) { | |
comm2D_ft = hpx::dataflow( // | |
mpi_executor, | |
[root, taskname](hpx::future<CommType> &&comm2D_ft, | |
hpx::shared_future<Matrix<T>> &&tr_panel_block_sf) | |
{ | |
profiler::TaskProfiler tp(taskname, "Comm"); | |
const auto& tr_panel_block = tr_panel_block_sf.get(); | |
auto comm2D = comm2D_ft.get(); | |
comm2D->colBcast(Message(tr_panel_block.m(), tr_panel_block.n(), | |
tr_panel_block.ptr(), tr_panel_block.ld()), | |
root); | |
return comm2D; | |
}, | |
comm2D_ft, panel_block_sf); | |
} | |
} | |
else { | |
// Others. | |
std::tie(comm2D_ft, panel_block_sf) = hpx::split_future(comm2D_ft.then( | |
mpi_executor, | |
[jb, kb, root, taskname](hpx::future<CommType> &&comm2D_ft) | |
{ | |
profiler::TaskProfiler tp(taskname, "Comm"); | |
Matrix<T> tr_panel_block(jb, kb); | |
auto comm2D = comm2D_ft.get(); | |
comm2D->colBcast(Message(tr_panel_block.m(), tr_panel_block.n(), | |
tr_panel_block.ptr(), tr_panel_block.ld()), | |
root); | |
return std::make_pair(comm2D, std::move(tr_panel_block)); | |
})); | |
} | |
// First panel of the trailing matrix need higher priority. | |
auto policy_trailing_matrix_panel = matrix_normal; | |
std::string name_prefix; | |
if (j == k + kb) { | |
policy_trailing_matrix_panel = matrix_HP; | |
name_prefix = "HP "; | |
} | |
else { | |
policy_trailing_matrix_panel = matrix_normal; | |
name_prefix = ""; | |
} | |
// Update a column of the trailing matrix. | |
if (index_j_l.indexRow().isMine()) { | |
auto taskname = profiler::createName(name_prefix + "Update Diag Trailing Matrix", | |
k / nb, j / nb, j / nb); | |
block_ft_acc(j, j) = hpx::dataflow( | |
*policy_trailing_matrix_panel, | |
[taskname](hpx::shared_future<Matrix<T>> &&panel_block_sf, | |
hpx::future<Matrix<T>> &&diag_block_ft) | |
{ | |
profiler::TaskProfiler tp(taskname, "Trailing Matrix"); | |
const auto& panel_block = panel_block_sf.get(); | |
auto diag_block = diag_block_ft.get(); | |
herk("L", "N", panel_block.m(), panel_block.n(), -1., panel_block.ptr(), | |
panel_block.ld(), 1., diag_block.ptr(), diag_block.ld()); | |
return diag_block; | |
}, | |
panel_block_sf, block_ft_acc(j, j)); | |
} | |
for (int i = j + nb; i < n; i += nb) { | |
auto index_i_l = a.localIndex(i, j); | |
if (index_i_l.indexRow().isMine()) { | |
auto taskname = profiler::createName(name_prefix + "Update Trailing Matrix", k / nb, | |
j / nb, i / nb); | |
block_ft_acc(i, j) = hpx::dataflow( | |
*policy_trailing_matrix_panel, | |
[taskname](hpx::shared_future<Matrix<T>> &&panel_sf, | |
hpx::shared_future<Matrix<T>> &&panel_block_sf, | |
hpx::future<Matrix<T>> &&matrix_panel_ft) | |
{ | |
profiler::TaskProfiler tp(taskname, "Trailing Matrix"); | |
const auto& panel = panel_sf.get(); | |
const auto& panel_block = panel_block_sf.get(); | |
auto matrix_panel = matrix_panel_ft.get(); | |
gemm("N", "C", panel.m(), panel_block.m(), panel_block.n(), -1., panel.ptr(), | |
panel.ld(), panel_block.ptr(), panel_block.ld(), 1., matrix_panel.ptr(), | |
matrix_panel.ld()); | |
return matrix_panel; | |
}, | |
panel_sf[i], panel_block_sf, block_ft_acc(i, j)); | |
} | |
} | |
} | |
} | |
} | |
// std::cout << "Signalling semaphore with k=" << k <<std::endl; | |
sem.wait(k); | |
//std::cout << "Gone past wait at k=" << k << std::endl; | |
} | |
delete matrix_HP; | |
delete matrix_normal; | |
// The execution of all MPI communication and of the last diagonal block (.get() above) | |
// means that all the tasks in all the nodes have executed. | |
// Ensure that all MPI communication happened: | |
comm2D_ft.get(); | |
} | |
#ifdef RING_BCAST | |
#undef colBcast | |
#undef rowBcast | |
#endif | |
#endif // CHOLESKY_HPX_MPI_H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment