Skip to content

Instantly share code, notes, and snippets.

@nariaki3551
Last active July 10, 2021 05:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nariaki3551/00e5cbe316cb5a3ac299d389da7714f0 to your computer and use it in GitHub Desktop.
Save nariaki3551/00e5cbe316cb5a3ac299d389da7714f0 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <mpi.h>
#include <vector>
#include <deque>
#include <progresscpp/ProgressBar.hpp>
int main( int argc, char *argv[] ) {
int size, rank;
MPI_Init( &argc, &argv );
MPI_Comm_size( MPI_COMM_WORLD, &size );
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
int n = 1000000; // job数
int N = 100; // rank>0の実行job数の送信間隔
int M = 100; // rank=0の受信確認間隔
int job_tag = 0; // job数送信時に使用するtag
int fin_tag = 1; // 全てのjobの完了時に使用するtag
int flag; // メッセージパッシング用
MPI_Status *status; // メッセージパッシング用
MPI_Request *request; // メッセージパッシング用
// initialize the bar
progresscpp::ProgressBar progressBar(n, 70);
///
/// iprobe + receive
///
auto iprobeRecv = [&flag, &status, &job_tag, &fin_tag, &progressBar](int source, int tag) {
// rank=sourceからメッセージが来ていないかを確認
MPI_Iprobe(source, tag, MPI_COMM_WORLD, &flag, status);
if ( flag == 1 ) {
// メッセージが来ていたら受信
if ( tag == job_tag ) {
int k = 0;
MPI_Recv(&k, 1, MPI_INT, source, tag, MPI_COMM_WORLD, status);
for ( int i = 0; i < k; i++ ) ++progressBar;
} else if ( tag == fin_tag ) {
MPI_Recv(NULL, 0, MPI_BYTE, source, fin_tag, MPI_COMM_WORLD, status);
}
}
return flag;
};
///
/// main
///
std::deque< std::pair<MPI_Request, int> > deq_req; // Isendオブジェクトの保存用
int k = 0;
for ( int i = rank; i < n; i += size ) {
// 何かしらのjobをした
++k;
if( rank == 0 ) {
if ( k > M ) {
// メッセージを受信しているかを確認
for ( int source = 1; source < size; source++ ) {
iprobeRecv(source, job_tag);
}
k = 0;
}
} else {
if ( k > N ) {
// 送信済みrequestを削除
for ( auto it = deq_req.begin(); it != deq_req.end(); ) {
MPI_Test(&(it->first), &flag, status);
if ( flag == 1 )
it = deq_req.erase(it);
else
++it;
}
// 前回送信から新たに行ったjob数を送信
auto req_pair = std::make_pair(MPI_Request(), k);
deq_req.push_back(req_pair); // 要素をコピーして末尾に追加
MPI_Isend(&(deq_req.back().second), 1, MPI_INT, 0, job_tag, MPI_COMM_WORLD, &(deq_req.back().first) ););
k = 0;
}
}
// 進捗バーを表示
if ( rank == 0 ) {
++progressBar;
progressBar.display();
}
}
///
/// Post Process
///
if ( rank > 0 ) {
for ( auto &req_pair : deq_req ) {
MPI_Wait(&(req_pair.first), status);
}
MPI_Send(&k, 1, MPI_INT, 0, job_tag, MPI_COMM_WORLD);
MPI_Send(NULL, 0, MPI_BYTE, 0, fin_tag, MPI_COMM_WORLD);
} else {
for ( int source = 1; source < size; source++ ) {
// end_tagを受け取るまでiprob
while ( !iprobeRecv(source, fin_tag) ) {
iprobeRecv(source, job_tag);
}
progressBar.display();
}
progressBar.display();
std::cout << std::endl;
}
MPI_Finalize();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment