Skip to content

Instantly share code, notes, and snippets.

@cwsmith
Created September 19, 2012 13:01
Show Gist options
  • Save cwsmith/3749525 to your computer and use it in GitHub Desktop.
Save cwsmith/3749525 to your computer and use it in GitHub Desktop.
luby mis
#include <algorithm>
#include <limits.h>
#include <time.h>
#include <stdlib.h>
#include <mpi.h>
#include <assert.h>
#include <map>
#include "mis.h"
using std::find;
using std::copy;
using std::back_inserter;
using std::map;
using std::make_pair;
using std::min_element;
using std::stable_sort;
using std::set_intersection;
#define MIS_DEBUG 1
void debugPrint(char* msg) {
if ( MIS_DEBUG ) {
printf("DEBUG %s", msg);
}
}
inline int GetGlobalPartId(int rank, int partIdx, int numParts) {
return rank*numParts+partIdx;
}
inline int GetOwningProcessRank(int globalPartId, int numParts) {
assert(numParts>=1);
return globalPartId/numParts;
}
/**
* @brief generate n random numbers
* @param n (In) number of random numbers to generate
* @param randNums (InOut) pre-allocated array of size n
* @return 0 on success, non-zero otherwise
*/
int generateRandomNumbers(int n, int* randNums) {
srand(time(NULL));
for ( int i=0; i<n; i++) {
randNums[i] = rand();
}
return 0;
}
//initialize the communication library
void initMPComm(MPComm& mpcom, vector<partInfo>& parts) {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
mpcom.numParts = parts.size();
create_neighbors(&(mpcom.nbors));
struct neighbor* nbor;
vector<partInfo>::iterator partItr;
for (partItr = parts.begin();
partItr != parts.end();
partItr++) {
vector<int>::iterator adjPartItr;
for (adjPartItr = partItr->adjPartIds.begin();
adjPartItr != partItr->adjPartIds.end();
adjPartItr++) {
int nbRank = GetOwningProcessRank(*adjPartItr, parts.size());
printf("[%d] has nbor %d\n", rank, nbRank);
nbor = make_or_get_neighbor(&(mpcom.nbors),nbRank);
}
}
//neighbors = {{processes owning adjacent parts},local process}
int nbCount = 0;
//neighbors_reset(&(mpcom.nbors));
for (struct neighbor* n = begin_neighbors(&(mpcom.nbors));
n != NULL; n = next_neighbor(n)) {
++nbCount;
printf("DEBUG bsp after [%d] sending to %d\n", rank, n->message.peer);
}
}
void finalizeMPComm(MPComm& mpcom) {
neighbors_dealloc(&(mpcom.nbors));
}
void sendIntsToNeighbors2(MPComm& mpcom, vector<partInfo>& parts ) {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
//allocate
struct neighbor* nb;
vector<int>::iterator adjPartIdItr;
int numNbA = 0;
neighbors_dealloc(&(mpcom.nbors));
vector<partInfo>::iterator partItr;
for (partItr = parts.begin();
partItr != parts.end();
partItr++) {
const int srcRank = GetOwningProcessRank(partItr->id, mpcom.numParts);
assert(rank == srcRank);
for (adjPartIdItr = partItr->adjPartIds.begin();
adjPartIdItr != partItr->adjPartIds.end();
adjPartIdItr++) {
const int destRank = GetOwningProcessRank(*adjPartIdItr, mpcom.numParts);
nb = get_neighbor(&(mpcom.nbors), (natural_t) destRank);
assert(nb != NULL);
//source part Id
buffer_push_size(&(nb->message.buffer), sizeof (int));
//destination part Id
buffer_push_size(&(nb->message.buffer), sizeof (int));
//int array
buffer_push_size(&(nb->message.buffer), partItr->net.size() * sizeof (int));
++numNbA;
}
}
neighbors_alloc_size(&(mpcom.nbors));
//pack
neighbors_reset(&(mpcom.nbors));
int numNbB = 0;
for (partItr = parts.begin();
partItr != parts.end();
partItr++) {
for (adjPartIdItr = partItr->adjPartIds.begin();
adjPartIdItr != partItr->adjPartIds.end();
adjPartIdItr++) {
const int destRank = GetOwningProcessRank(*adjPartIdItr, mpcom.numParts);
nb = get_neighbor(&(mpcom.nbors), (natural_t) destRank);
assert(nb != NULL);
//pack source part Id
int* buff = (int*) buffer_traverse(&(nb->message.buffer), sizeof (int));
*buff = partItr->id;
//pack destination part Id
buff = (int*) buffer_traverse(&(nb->message.buffer), sizeof (int));
*buff = *adjPartIdItr;
//pack int array
buff = (int*) buffer_traverse(&(nb->message.buffer), partItr->net.size() * sizeof (int));
copy(partItr->net.begin(), partItr->net.end(), buff);
printf("DEBUG [%d] sending to %d (%d)\n", rank, nb->message.peer, destRank);
++numNbB;
}
}
assert(numNbA == numNbB);
}
void sendIntsToNeighbors(MPComm& mpcom, int srcPartId, vector<int>& msg, vector<int> nborPartIds) {
exit(1);
}
void finalizeSend(MPComm& mpcom) {
//send
bsp_begin();
send_neighbors(&(mpcom.nbors));
}
void appendMessage(int rank, int srcPartId, int destPartId, struct neighbors* nbors,
struct message* m, int numIntsInBuff,
vector<partInfo>& parts, vector<int>*& msg) {
//find index to store message
int partIdx = 0;
for (; partIdx < parts.size(); partIdx++) {
if (parts[partIdx].id == destPartId) {
break;
}
}
assert(partIdx >= 0 && partIdx < parts.size());
//unpack int array
int* buff = (int*) buffer_traverse(&(m->buffer), numIntsInBuff * sizeof (int));
copy(buff, buff + numIntsInBuff, back_inserter(msg[partIdx]));
printf("DEBUG append [%3d] received from [%3d] srcPartId=%3d destPartId=%3d #ints=%3d\n", rank, (int)m->peer, srcPartId, destPartId, (int)numIntsInBuff);
}
void recvMergedIntsFromNeighbors2(MPComm& mpcom, vector<partInfo>& parts, vector<int>*& msg) {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
struct in_message in;
create_in_message(&in);
while (!bsp_done_receiving()) {
if (neighbors_done_sending(&(mpcom.nbors))) {
bsp_done_sending();
}
if (receive_in_message(&in)) {
natural_t numIntsInBuff = in.message.buffer.size / sizeof(int);
buffer_reset(&(in.message.buffer));
//unpack source part Id
int* buff = (int*) buffer_traverse(&(in.message.buffer), sizeof (int));
const int srcPartId = *buff;
--numIntsInBuff;
//unpack destination part Id
buff = (int*) buffer_traverse(&(in.message.buffer), sizeof (int));
const int destPartId = *buff;
--numIntsInBuff;
printf("DEBUG [%3d] received from [%3d] srcPartId=%3d destPartId=%3d #ints=%3d\n", rank, (int)in.message.peer, srcPartId, destPartId, (int)numIntsInBuff);
appendMessage(rank, srcPartId, destPartId, &(mpcom.nbors), &(in.message), numIntsInBuff, parts, msg);
}
}
destroy_in_message(&in);
}
void recvMergedIntsFromNeighbors(MPComm& mpcom, vector<partInfo>& parts, vector<int>*& msg) {
exit(1);
// int rank;
// MPI_Comm_rank(MPI_COMM_WORLD, &rank);
//
// void *pvMsgRecv;
// int iPidFrom;
// vector<int>::iterator* msgItr = new vector<int>::iterator[parts.size()];
// for(int partIdx = 0; partIdx < parts.size(); partIdx++) {
// msgItr[partIdx] = msg[partIdx].begin();
// }
// while (int recvSize = mpcom.ipcm->receive(pvMsgRecv, &iPidFrom)) {
// int numIntsRecvd = recvSize/sizeof(int);
// int* piMsg = (int*) pvMsgRecv;
// const int srcPartId = piMsg[0];
// const int destPartId = piMsg[1];
// const int srcPartRank = GetOwningProcessRank(srcPartId, mpcom.numParts);
// const int destPartRank = GetOwningProcessRank(destPartId, mpcom.numParts);
//
// char dbgMsg[256];
// sprintf(dbgMsg, "[%d] received from rank %d (%d)\n", rank, srcPartRank, srcPartId);
// debugPrint(dbgMsg);
// assert(rank == destPartRank);
//
// int partIdx = 0;
// for(; partIdx < parts.size(); partIdx++) {
// if ( parts[partIdx].id == destPartId ) {
// break;
// }
// }
// assert(partIdx >= 0 && partIdx < parts.size());
//
// copy(&(piMsg[2]), &(piMsg[numIntsRecvd]), msgItr[partIdx]);
// msgItr[partIdx] = msg[partIdx].end();
//
// mpcom.ipcm->free_msg(pvMsgRecv);
// }
// delete [] msgItr;
}
void recvMappedIntsFromNeighbors(MPComm& mpcom, vector<partInfo>& parts) {
// void *pvMsgRecv;
// int iPidFrom;
// vector<int> netAdjPartSizes; //sanity check
// vector<partInfo>::iterator partItr;
// for (partItr = parts.begin();
// partItr != parts.end();
// partItr++) {
// netAdjPartSizes.push_back(partItr->netAdjParts.size());
// }
// while (int recvSize = mpcom.ipcm->receive(pvMsgRecv, &iPidFrom)) {
// int numIntsRecvd = recvSize / sizeof (int);
// int* piMsg = (int*) pvMsgRecv;
// const int srcPartId = piMsg[0];
// const int destPartId = piMsg[1];
//
// for (partItr = parts.begin();
// partItr != parts.end();
// partItr++) {
// if ( partItr->id == destPartId ) {
// partItr->netAdjParts[srcPartId] = piMsg[2];
// }
// }
//
// mpcom.ipcm->free_msg(pvMsgRecv);
// }
//
// for (int partIdx=0; partIdx < parts.size(); partIdx++) {
// if ( netAdjPartSizes[partIdx] != parts[partIdx].netAdjParts.size() ) {
// printf("WARNING: change in netAdjPart size init=%d final=%d\n",
// netAdjPartSizes[partIdx], parts[partIdx].netAdjParts.size());
// }
// assert(netAdjPartSizes[partIdx] == parts[partIdx].netAdjParts.size());
// }
}
int minRandNum(netAdjItr first, netAdjItr last) {
int min = INT_MAX;
for (netAdjItr itr = first; itr != last; itr++) {
if ( itr->second < min) {
min = itr->second;
}
}
return min;
}
void getNetAdjPartIds(netAdjItr first, netAdjItr last, vector<int>& ids) {
for (netAdjItr itr = first; itr != last; itr++) {
ids.push_back(itr->first);
}
}
void constructNetAdjPartInfo(MPComm& mpcom, vector<partInfo>& parts) {
// send net to each neighbor
sendIntsToNeighbors2(mpcom, parts);
finalizeSend(mpcom);
vector<int>* nbNet = new vector<int>[parts.size()];
recvMergedIntsFromNeighbors2(mpcom, parts, nbNet);
int partIdx = 0;
vector<partInfo>::iterator partItr;
for (partItr = parts.begin();
partItr != parts.end();
partItr++, partIdx++) {
stable_sort(nbNet[partIdx].begin(), nbNet[partIdx].end());
stable_sort(partItr->net.begin(), partItr->net.end());
vector<int> intersection;
set_intersection(nbNet[partIdx].begin(), nbNet[partIdx].end(), partItr->net.begin(), partItr->net.end(), intersection.begin());
vector<int>::iterator itr;
for (itr = intersection.begin(); itr != intersection.end(); itr++ ) {
partItr->netAdjParts[*itr] = 0;
}
}
exit(1);
partIdx = 0;
for (partItr = parts.begin();
partItr != parts.end();
partItr++, partIdx++) {
vector<int> msg(1,partItr->randNum);
sendIntsToNeighbors(mpcom, partItr->id, msg, partItr->adjPartIds);
}
finalizeSend(mpcom);
recvMappedIntsFromNeighbors(mpcom, parts);
delete [] nbNet;
}
void setRandomNums(int rank, int totNumParts, vector<partInfo>& parts) {
const int numParts = parts.size();
int* localRandNums = new int[numParts];
if (rank == 0) {
int* randNums = new int[totNumParts];
generateRandomNumbers(totNumParts, randNums);
MPI_Scatter(randNums, numParts, MPI_INT, localRandNums, 1, MPI_INT, 0, MPI_COMM_WORLD);
delete [] randNums;
} else {
MPI_Scatter(NULL, numParts, MPI_INT, localRandNums, 1, MPI_INT, 0, MPI_COMM_WORLD);
}
int partIdx = 0;
vector<partInfo>::iterator partItr;
for (partItr = parts.begin();
partItr != parts.end();
partItr++, partIdx++) {
partItr->randNum = localRandNums[partIdx];
}
delete [] localRandNums;
}
int mis(const int rank, const int totNumParts, vector<partInfo>& parts, vector<int>& mis) {
MPComm mpcom;
initMPComm(mpcom, parts);
setRandomNums(rank, totNumParts, parts);
constructNetAdjPartInfo(mpcom, parts);
exit(1);
vector<int>* nodesToRemove = new vector<int>[parts.size()];
vector<int>* rmtNodesToRemove = new vector<int>[parts.size()];
int totalNodesAdded = 0;
while (totalNodesAdded > 0) {
int numNodesAdded = 0;
vector<partInfo>::iterator partItr;
for (partItr = parts.begin();
partItr != parts.end();
partItr++) {
partItr->isInMIS = 0;
if (partItr->netAdjParts.size() > 0) {
const int minRand = minRandNum(partItr->netAdjParts.begin(), partItr->netAdjParts.end());
if (partItr->randNum < minRand) {
partItr->isInMIS = 1;
mis.push_back(partItr->id);
++numNodesAdded;
}
}
}
int partIdx = 0;
for (partItr = parts.begin();
partItr != parts.end();
partItr++, partIdx++) {
if ( 1 == partItr->isInMIS ) {
nodesToRemove[partIdx].reserve(partItr->netAdjParts.size()+1);
getNetAdjPartIds(partItr->netAdjParts.begin(),partItr->netAdjParts.end(), nodesToRemove[partIdx]);
nodesToRemove[partIdx].push_back(partItr->id);
sendIntsToNeighbors(mpcom, partItr->id, nodesToRemove[partIdx], partItr->adjPartIds);
} else {
sendIntsToNeighbors(mpcom, partItr->id, nodesToRemove[partIdx], partItr->adjPartIds);
}
}
finalizeSend(mpcom);
recvMergedIntsFromNeighbors(mpcom, parts, rmtNodesToRemove);
partIdx = 0;
for (partItr = parts.begin();
partItr != parts.end();
partItr++, partIdx++) {
if ( partItr->isInMIS == 1 ||
find(rmtNodesToRemove[partIdx].begin(),
rmtNodesToRemove[partIdx].end(),
partItr->id) == rmtNodesToRemove[partIdx].end() ) {
partItr->netAdjParts.clear();
} else {
vector<int>::iterator rmvNodeItr;
for (rmvNodeItr = rmtNodesToRemove[partIdx].begin();
rmvNodeItr != rmtNodesToRemove[partIdx].end();
rmvNodeItr++) {
partItr->netAdjParts.erase(*rmvNodeItr);
}
}
nodesToRemove[partIdx].clear();
rmtNodesToRemove[partIdx].clear();
}
MPI_Allreduce(&numNodesAdded, &totalNodesAdded, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
}
delete [] nodesToRemove;
delete [] rmtNodesToRemove;
finalizeMPComm(mpcom);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment