Skip to content

Instantly share code, notes, and snippets.

@mbrucher
Last active November 9, 2015 10:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mbrucher/05f7aa0ed7d6a5ed44df to your computer and use it in GitHub Desktop.
Save mbrucher/05f7aa0ed7d6a5ed44df to your computer and use it in GitHub Desktop.
Asynchronous reading and writing partitioned data
/**
*
*/
#include "parallelpropertybroadcaster.h"
#include <iostream>
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/functional/hash.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
namespace mpi = boost::mpi;
using namespace boost::posix_time;
const int CONTIGUOUS_CHUNK = 1;
const int half_size = 1000;
const int size = half_size * half_size;
const int buff_size = 1024 * 1024;
const int MAX_ITERATIONS = 100;
time_duration t_setup;
time_duration t_generate;
time_duration t_scatter;
time_duration t_read;
time_duration t_flush;
ptime p;
struct Reader
{
int file_id;
void read(int64_t* buff, int64_t size, int64_t offset) const
{
t_scatter += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
ssize_t status = pread(file_id, buff, sizeof(int64_t) * size, sizeof(int64_t) * offset);
posix_fadvise(file_id, sizeof(int64_t) * offset, sizeof(int64_t) * size, POSIX_FADV_DONTNEED);
t_read += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
}
};
void test(mpi::communicator world_comm, bool reader, int decimation, std::vector<int64_t>& values, const std::vector<std::pair<int64_t, int64_t> >& global_to_local)
{
static int i = 0;
int file_id = 0;
if(reader)
{
file_id = open(("ptest_" + boost::lexical_cast<std::string>(i) + ".raw").c_str(), O_RDONLY);
++i;
}
AsyncParallelPropertyBroadcaster<int64_t> g(&values[0], size * world_comm.size(), global_to_local, buff_size, world_comm, decimation);
Reader custom_reader;
custom_reader.file_id = file_id;
t_setup += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
g.process(boost::bind(&Reader::read, custom_reader, _1, _2, _3));
t_scatter += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
if(reader)
{
close(file_id);
}
t_flush += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
}
int main(int argc, char** argv)
{
mpi::environment env(argc, argv);
mpi::communicator world_comm;
p = microsec_clock::local_time();
int decimation = 16;
if(argc > 1)
{
decimation = boost::lexical_cast<int>(argv[1]);
}
int continguous_chunk = CONTIGUOUS_CHUNK;
if(argc > 2)
{
continguous_chunk = boost::lexical_cast<int>(argv[2]);
}
if(world_comm.size() % decimation != 0)
{
decimation = world_comm.size() / (world_comm.size() % decimation);
}
else
{
decimation = world_comm.size() / decimation;
}
if(world_comm.rank() == 0)
std::cout << "Setting decimation to " << decimation << std::endl;
t_setup += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
std::vector<int64_t> values(size, 0);
std::vector<std::pair<int64_t,int64_t> > global_to_local(size);
for(int64_t i = 0; i < size / continguous_chunk; ++i)
{
for(int64_t j = 0; j < continguous_chunk; ++j)
{
values[i * continguous_chunk + j] = j + (world_comm.rank() + i * world_comm.size() * continguous_chunk);
global_to_local[i * continguous_chunk + j] = std::make_pair(j + (world_comm.rank() + i * world_comm.size() * continguous_chunk), i * continguous_chunk + j);
}
}
t_generate += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
for(int i = 0; i < MAX_ITERATIONS; ++i)
{
test(world_comm, world_comm.rank() % decimation == 0, std::min(decimation, world_comm.size()), values, global_to_local);
for(int i = 0; i < half_size * half_size; ++i)
{
assert(values[i] == global_to_local[i].first);
}
t_setup += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
}
std::cout << "Total run time " << (t_setup + t_generate + t_scatter + t_read + t_flush).total_microseconds() / 1e6 << std::endl;
std::cout << "Breakdown run time " << t_setup.total_microseconds() / 1e6 << "\t " << t_generate.total_microseconds() / 1e6 << "\t " << t_scatter.total_microseconds() / 1e6 << "\t " << t_read.total_microseconds() / 1e6 << "\t " << t_flush.total_microseconds() / 1e6 << "\t " << std::endl;
return EXIT_SUCCESS;
}
/**
*
*/
#include "parallelpropertygatherer.h"
#include <iostream>
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/functional/hash.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include "pmain.h"
namespace mpi = boost::mpi;
using namespace boost::posix_time;
const int half_size = 1000;
const int size = half_size * half_size;
const int buff_size = 1024 * 1024;
const int MAX_ITERATIONS = 100;
time_duration t_setup;
time_duration t_generate;
time_duration t_gather;
time_duration t_write;
time_duration t_flush;
ptime p;
struct Writer
{
int file_id;
void write(long* buff, long size, long offset)
{
t_gather += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
ssize_t status = pwrite(file_id, buff, sizeof(long) * size, sizeof(long) * offset);
t_write += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
}
};
void test(mpi::communicator world_comm, int decimation, const std::vector<long>& values, const std::vector<std::pair<int, int> >& global_to_local)
{
static int i = 0;
int file_id;
bool writer = world_comm.rank() % decimation == 0;
if(world_comm.rank() == 0)
{
file_id = creat(("ptest_" + boost::lexical_cast<std::string>(i) + ".raw").c_str(), 0644);
close(file_id);
}
MPI_Barrier(world_comm);
if(writer)
{
file_id = open(("ptest_" + boost::lexical_cast<std::string>(i) + ".raw").c_str(), O_RDWR);
++i;
}
ParallelPropertyGatherer<long> g(&values[0], values.size() * world_comm.size(), global_to_local, buff_size, world_comm, writer, decimation);
Writer custom_writer;
custom_writer.file_id = file_id;
t_setup += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
g.process(boost::bind(&Writer::write, custom_writer, _1, _2, _3));
t_gather += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
if(writer)
{
close(file_id);
}
t_flush += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
}
int main(int argc, char** argv)
{
mpi::environment env(argc, argv);
mpi::communicator world_comm;
p = microsec_clock::local_time();
int decimation = 16;
if(argc > 1)
{
decimation = boost::lexical_cast<int>(argv[1]);
}
if(world_comm.size() % decimation != 0)
{
decimation = world_comm.size() / (world_comm.size() % decimation);
}
else
{
decimation = world_comm.size() / decimation;
}
if(world_comm.rank() == 0)
std::cout << "Setting decimation to " << decimation << std::endl;
mpi::communicator gather_comm = generate_local_communicator(world_comm);
mpi::communicator writer_comm = world_comm.split(gather_comm.rank());
t_setup += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
std::vector<long> values(size, 0);
std::vector<std::pair<int, int> > global_to_local(size);
for(int i = 0; i < size; ++i)
{
values[i] = world_comm.rank() + i * world_comm.size();
global_to_local[i] = std::make_pair(world_comm.rank() + i * world_comm.size(), i);
}
t_generate += microsec_clock::local_time() - p;
p = microsec_clock::local_time();
for(int i = 0; i < MAX_ITERATIONS; ++i)
{
test(world_comm, decimation, values, global_to_local);
}
std::cout << "Total run time " << (t_setup + t_generate + t_gather + t_write + t_flush).total_microseconds() / 1e6 << std::endl;
std::cout << "Breakdown run time " << t_setup.total_microseconds() / 1e6 << "\t " << t_generate.total_microseconds() / 1e6 << "\t " << t_gather.total_microseconds() / 1e6 << "\t " << t_write.total_microseconds() / 1e6 << "\t " << t_flush.total_microseconds() / 1e6 << "\t " << std::endl;
return EXIT_SUCCESS;
}
mpi::communicator generate_local_communicator(mpi::communicator global_comm)
{
std::size_t hash = boost::hash<std::string>()(mpi::environment::processor_name());
mpi::communicator local_comm = global_comm.split(hash);
return local_comm;
}
/***
* \file parallelpropertybroadcaster.h
*/
#ifndef PARALLELPROPERTYBROADCASTER
#define PARALLELPROPERTYBROADCASTER
#include <map>
#include <numeric>
#include <vector>
#include <future>
#include <thread>
#include <boost/mpi.hpp>
#include "../mpitraits.h"
template<typename T>
class AsyncParallelPropertyBroadcaster
{
private:
T* m_values;
int64_t m_total_number_of_values;
int64_t m_begin_id;
const std::vector<std::pair<int64_t,int64_t> >& m_global_to_local_id;
int64_t m_buffer_size;
boost::mpi::communicator m_global_comm;
int m_decimation;
const int m_max_buffers;
std::vector<int> m_requests_state;
std::vector<MPI_Request> m_multirequests;
int64_t m_offset;
std::vector<std::vector<T> > m_output_buffers;
std::vector<T> m_thread_chunk;
const int m_number_of_chunks;
int m_comm_size;
int m_processed_chunks;
int m_created_chunks;
std::size_t m_current_processed_value;
std::future<void> m_future;
public:
AsyncParallelPropertyBroadcaster(T* values, int64_t total_number_of_values, const std::vector<std::pair<int64_t,int64_t> >& global_to_local_id, int buffer_size, boost::mpi::communicator global_comm, int decimation)
:m_values(values), m_total_number_of_values(total_number_of_values), m_global_to_local_id(global_to_local_id), m_buffer_size(buffer_size/sizeof(T)), m_global_comm(global_comm), m_decimation(decimation),
m_number_of_chunks((total_number_of_values + m_buffer_size - 1) / m_buffer_size), m_max_buffers(m_global_comm.size()/decimation)
{
m_comm_size = m_global_comm.size();
m_multirequests.resize(m_max_buffers);
m_output_buffers.resize(m_max_buffers);
m_requests_state.assign(m_max_buffers, -1);
m_processed_chunks = 0;
m_created_chunks = 0;
m_current_processed_value = 0;
m_begin_id = -m_buffer_size;
m_offset = (m_global_comm.rank() / decimation - m_max_buffers) * m_buffer_size;
}
template<class Reader>
void process(const Reader& callback_reader)
{
if(m_global_comm.rank() % m_decimation == 0)
{
launch_read(callback_reader); // start reading
}
while(m_processed_chunks != m_number_of_chunks)
{
for(int i = 0; i < m_max_buffers; ++i)
{
if(m_created_chunks != m_number_of_chunks && m_requests_state[i] == -1)
{
create_new_chunk(i);
}
else if(m_requests_state[i] == 0) // Wait end read + start new read + Broadcast
{
test_stage_one(i, callback_reader);
}
else if(m_requests_state[i] == 1) // Wait broadcast
{
test_stage_two(i);
}
else if(m_requests_state[i] == 2) // Populate values
{
test_stage_three(i);
}
}
}
}
private:
template<class Reader>
void launch_read(const Reader& callback_reader)
{ // Pending C++11
m_offset += m_max_buffers * m_buffer_size;
int new_size = std::min(m_total_number_of_values - m_offset, m_buffer_size);
if(new_size <= 0)
return;
m_thread_chunk.resize(new_size);
m_future = std::async(callback_reader, m_thread_chunk.data(), m_thread_chunk.size(), m_offset);
}
void create_new_chunk(int available)
{
int64_t offset = m_created_chunks * m_buffer_size;
m_output_buffers[available].resize(std::min(m_total_number_of_values - offset, m_buffer_size));
m_requests_state[available] = 0;
++m_created_chunks;
}
template<class Reader>
void test_stage_one(int i, const Reader& callback_reader)
{
if(m_global_comm.rank() == i * m_decimation)
{
if(m_future.valid())
{
m_future.wait();
}
m_thread_chunk.swap(m_output_buffers[i]);
launch_read(callback_reader); // async read
}
MPI_Ibcast(&m_output_buffers[i][0], m_output_buffers[i].size(), MPITraits<int64_t>::get_MPI_type(), i * m_decimation, m_global_comm, &m_multirequests[i]);
m_requests_state[i] = 1;
}
void test_stage_two(int i)
{
int rstatus = MPI_Wait(&m_multirequests[i], MPI_STATUSES_IGNORE);
m_requests_state[i] = 2;
}
void test_stage_three(int j)
{
m_begin_id += m_buffer_size;
if ( m_current_processed_value < static_cast<int>(m_global_to_local_id.size()) )
{
int next_global_id = m_global_to_local_id[m_current_processed_value].first;
// Walk range and fill in values I have
for (int i=0 ; i != m_output_buffers[j].size(); ++i )
{
if ( m_begin_id + i == next_global_id )
{
const int ilocal = m_global_to_local_id[m_current_processed_value].second;
m_values[ilocal] = m_output_buffers[j][i];
// If done all ours then can skip the rest
++m_current_processed_value;
if ( m_current_processed_value == m_global_to_local_id.size())
break;
next_global_id = m_global_to_local_id[m_current_processed_value].first;
}
}
}
m_requests_state[j] = -1;
++m_processed_chunks;
}
};
#endif
/***
* \file parallelpropertygatherer.h
*/
#ifndef PARALLELPROPERTYGATHERER
#define PARALLELPROPERTYGATHERER
#include <future>
#include <map>
#include <memory>
#include <numeric>
#include <thread>
#include <vector>
#include <boost/mpi.hpp>
#include "../mpitraits.h"
// Template to define a bad value - needed because of std::pair
template<typename T>
struct BadValue
{
static T value()
{
return std::numeric_limits<T>::min();
}
};
template<>
struct BadValue<std::pair<int,int> >
{
static std::pair<int,int> value()
{
return std::pair<int,int>(std::numeric_limits<int>::min(),std::numeric_limits<int>::min());
}
};
template<typename T>
class ParallelPropertyGatherer
{
private:
const T* m_values;
int m_total_number_of_values;
const std::vector<std::pair<int,int> >& m_global_to_local_id;
long m_buffer_size;
boost::mpi::communicator m_global_comm;
bool m_writer;
int m_decimation;
const int m_max_buffers;
std::vector<int> m_requests_state;
std::vector<MPI_Request> m_multirequests;
std::vector<std::vector<T> > m_input_buffers;
std::vector<std::vector<long> > m_input_indices;
std::vector<int> m_receiveinsizes;
std::vector<T> m_output_buffers;
std::vector<long> m_output_indices;
std::vector<int> m_receivesizes;
std::vector<int> m_offset_buffers;
long m_offset;
std::vector<T> m_chunk;
std::vector<T> m_thread_chunk;
const int m_number_of_chunks;
int m_comm_size;
int m_writers_size;
int m_processed_chunks;
int m_created_chunks;
std::size_t m_current_processed_value;
std::future<void> m_future;
public:
ParallelPropertyGatherer(const T* values, int total_number_of_values, const std::vector<std::pair<int,int> >& global_to_local_id, int buffer_size, boost::mpi::communicator global_comm, bool writer, int decimation)
:m_values(values), m_total_number_of_values(total_number_of_values), m_global_to_local_id(global_to_local_id), m_buffer_size(buffer_size/sizeof(T)), m_global_comm(global_comm), m_writer(writer), m_decimation(decimation),
m_number_of_chunks((total_number_of_values + m_buffer_size - 1) / m_buffer_size), m_max_buffers(m_global_comm.size()/decimation)
{
m_comm_size = m_global_comm.size();
m_writers_size = m_comm_size / m_decimation;
m_multirequests.resize(2 * m_max_buffers);
m_input_buffers.resize(m_max_buffers);
m_input_indices.resize(m_max_buffers);
m_receivesizes.resize(m_comm_size);
m_offset_buffers.assign(m_comm_size + 1, 0);
m_receiveinsizes.resize(m_max_buffers);
m_requests_state.assign(m_max_buffers, -1);
m_offset = 0;
for(int i = 0; i < m_max_buffers; ++i)
{
m_input_buffers[i].resize(m_buffer_size);
m_input_indices[i].resize(m_buffer_size);
}
m_processed_chunks = 0;
m_created_chunks = 0;
m_current_processed_value = 0;
}
template<class Writer>
void process(const Writer& callback_writer)
{
while(m_processed_chunks != m_number_of_chunks)
{
for(int i = 0; i < m_max_buffers; ++i)
{
if(m_created_chunks != m_number_of_chunks && m_requests_state[i] == -1)
{
create_new_chunk(i);
}
else if(m_requests_state[i] == 0) // Waiting on the gather completion
{
test_stage_one(i);
}
else if(m_requests_state[i] == 1) // Waiting on the two gatherv completion
{
test_stage_two(i);
}
else if(m_requests_state[i] == 2) // Write
{
test_stage_three(i, callback_writer);
}
}
}
if(m_future.valid())
{
m_future.wait();
}
}
private:
void create_new_chunk(int available)
{
int writer_rank = m_decimation * available;
fill_buffer(available, std::min(m_buffer_size, m_total_number_of_values - m_created_chunks * m_buffer_size));
if(m_global_comm.rank() == writer_rank)
{
m_offset = m_created_chunks * m_buffer_size;
}
MPI_Igather(&m_receiveinsizes[available], 1, MPITraits<int>::get_MPI_type(), &m_receivesizes[0], 1, MPITraits<int>::get_MPI_type(), writer_rank, m_global_comm, &m_multirequests[2 * available]);
m_requests_state[available] = 0;
++m_created_chunks;
}
void fill_buffer(int buffer, int nvals)
{
m_input_buffers[buffer].clear();
m_input_indices[buffer].clear();
std::size_t end_id = (m_created_chunks+1) * m_buffer_size;
while(m_current_processed_value < m_global_to_local_id.size() && m_global_to_local_id[m_current_processed_value].first < end_id)
{
m_input_buffers[buffer].push_back(m_values[m_global_to_local_id[m_current_processed_value].second]);
m_input_indices[buffer].push_back(m_global_to_local_id[m_current_processed_value].first);
++m_current_processed_value;
}
m_receiveinsizes[buffer] = m_input_buffers[buffer].size();
}
void test_stage_one(int i)
{
MPI_Wait(&m_multirequests[2 * i], MPI_STATUSES_IGNORE);
if(m_global_comm.rank() == i * m_decimation)
{
std::partial_sum(m_receivesizes.begin(), m_receivesizes.end(), m_offset_buffers.begin() + 1);
m_output_buffers.resize(*m_offset_buffers.rbegin());
m_output_indices.resize(*m_offset_buffers.rbegin());
}
MPI_Igatherv(m_input_buffers[i].data(), m_input_buffers[i].size(), MPITraits<T>::get_MPI_type(), m_output_buffers.data(), m_receivesizes.data(), m_offset_buffers.data(), MPITraits<T>::get_MPI_type(), i * m_decimation, m_global_comm, &m_multirequests[2 * i]);
MPI_Igatherv(m_input_indices[i].data(), m_input_buffers[i].size(), MPITraits<int64_t>::get_MPI_type(), m_output_indices.data(), m_receivesizes.data(), m_offset_buffers.data(), MPITraits<int64_t>::get_MPI_type(), i * m_decimation, m_global_comm, &m_multirequests[2 * i + 1]);
m_requests_state[i] = 1;
}
void test_stage_two(int i)
{
MPI_Wait(&m_multirequests[2 * i], MPI_STATUSES_IGNORE);
MPI_Wait(&m_multirequests[2 * i + 1], MPI_STATUSES_IGNORE);
m_requests_state[i] = 2;
}
template<class Writer>
void test_stage_three(int i, const Writer& callback_writer)
{
if(m_global_comm.rank() == i * m_decimation)
{
m_chunk.resize(std::min(m_total_number_of_values - m_offset, m_buffer_size));
write_data(m_output_buffers, m_output_indices, m_chunk, m_offset);
if(m_future.valid())
{
m_future.wait();
}
m_thread_chunk.swap(m_chunk);
{
m_future = std::async(callback_writer, m_thread_chunk.data(), m_thread_chunk.size(), m_offset);
}
}
m_requests_state[i] = -1;
++m_processed_chunks;
}
void write_data(const std::vector<T>& data, const std::vector<long>& all_index, std::vector<T>& chunk, long offset) const
{
T* chunkptr = &chunk[0] - offset;
int size = all_index.size();
for(int i = 0; i < size; ++i)
{
chunkptr[all_index[i]] = data[i];
}
}
};
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment