Created
February 11, 2013 14:05
-
-
Save lindenb/4754583 to your computer and use it in GitHub Desktop.
My first Open-MPI program: reads the reads in one or more BAM files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SAMDIR=/commun/data/packages/samtools-0.1.18 | |
OPENMPI.dir=/commun/data/packages/openmpi | |
CC=$(OPENMPI.dir)/bin/ortecc | |
.PHONY: all | |
all: ./ompibam | |
./ompibam: ompibam.c | |
$(CC) -o $@ -O3 -I$(SAMDIR) \ | |
-Wl,-rpath -Wl,$(OPENMPI.dir)/lib \ | |
-L$(SAMDIR) -Wall $< -lmpi -lbam -lz |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Author : Pierre Lindenbaum PhD | |
* Date : 2013 | |
* WWW: http://plindenbaum.blogspot.com | |
* Motivation: | |
* my first Open-MPI source. Takes a list of BAM files | |
* and count the number of reads. | |
*/ | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <mpi.h> | |
#include "sam.h" | |
/** message send to the node: "count the read in a bam" */ | |
#define TAG_COUNT_BAM 1 | |
/** message send to the node: "shutdown" */ | |
#define TAG_RELEASE_SLAVE 2 | |
/** | |
* fixed-size structure send to the nodes. contains the BAMs and the counts of reads | |
*/ | |
typedef struct Params_t | |
{ | |
char filename[FILENAME_MAX]; | |
int is_error; | |
long count_total; | |
long count_mapped; | |
long count_properly_paired; | |
long count_dup; | |
}Params; | |
#define IS_FLAG_SET(b,f) ((b->core.flag & (f))!=0) | |
/** | |
* count the reads in a BAM file using the SAMTOOLS API | |
* fills the structure Params* | |
*/ | |
static void count_reads(Params* p) | |
{ | |
int ret; | |
bam1_t* b; | |
bamFile fp=bam_open(p->filename, "r") ; | |
bam_header_t *header; | |
if(fp==0) | |
{ | |
fprintf(stderr,"Cannot open %s\n",p->filename); | |
p->is_error=1; | |
return; | |
} | |
header= bam_header_read(fp); | |
b=bam_init1(); | |
while ((ret = bam_read1(fp, b)) >= 0) | |
{ | |
p->count_total++; | |
if(IS_FLAG_SET(b,BAM_FUNMAP)) continue; | |
p->count_mapped++; | |
if(IS_FLAG_SET(b,BAM_FPROPER_PAIR)) | |
{ | |
p->count_properly_paired++; | |
} | |
if(IS_FLAG_SET(b,BAM_FDUP)) | |
{ | |
p->count_dup++; | |
} | |
} | |
bam_destroy1(b); | |
bam_header_destroy(header); | |
bam_close(fp); | |
} | |
/** | |
* This is the slave method, it receive a message 'Params' from the master method | |
* if the status is TAG_RELEASE_SLAVE the slave is released | |
* else the bam in param.filename is analyszed and the message is sent back to the master | |
*/ | |
static void slave() | |
{ | |
for(;;) | |
{ | |
MPI_Status status; | |
Params param; | |
/* Receive a message from the master */ | |
MPI_Recv( | |
¶m, | |
sizeof(Params), | |
MPI_CHAR, | |
0, | |
MPI_ANY_TAG, | |
MPI_COMM_WORLD, | |
&status); | |
if(status.MPI_TAG==TAG_RELEASE_SLAVE) | |
{ | |
break; | |
} | |
count_reads(¶m); | |
/* Send the result back */ | |
MPI_Send( | |
¶m, | |
sizeof(Params), | |
MPI_CHAR, | |
0, | |
0, | |
MPI_COMM_WORLD | |
); | |
} | |
} | |
/** | |
* this is the master method, while there is | |
* a bam in the queue, send the analysis to the slaves. | |
* At the end, close the slaves | |
*/ | |
static void master(const int n_bams,char** argv) | |
{ | |
int rank=0; | |
int optind=0; | |
int proc_count=0; | |
/* get number of available processor proc_count */ | |
if(MPI_Comm_size(MPI_COMM_WORLD, &proc_count)!=MPI_SUCCESS ) | |
{ | |
return; | |
} | |
printf("#filename\ttotal\tmapped\tproperly_paired\tdup\n"); | |
/* loop over the bam files */ | |
while(optind< n_bams) | |
{ | |
int i=0; | |
/* number of message sent */ | |
int count_sent=0; | |
/* loop over the available procs */ | |
for (rank = 1; | |
rank <proc_count && optind+count_sent < n_bams; | |
++rank) | |
{ | |
/* create a new param */ | |
Params param; | |
/** fill to '0' */ | |
memset((void*)¶m,0,sizeof(Params)); | |
/* set the BAM filename */ | |
strcpy(param.filename,argv[count_sent+optind]); | |
/* Send it to each rank */ | |
MPI_Send( | |
¶m, /* message buffer */ | |
sizeof(Params), /* one data item */ | |
MPI_CHAR, /* data item is an array of bytes */ | |
rank, /* destination process rank */ | |
TAG_COUNT_BAM, /* user chosen message tag */ | |
MPI_COMM_WORLD /* default communicator */ | |
); | |
++count_sent; | |
} | |
/* now we get the results */ | |
for(i=0;i< count_sent;++i) | |
{ | |
/* get a message from the slave */ | |
Params param; | |
MPI_Status status; | |
MPI_Recv(¶m, /* message buffer */ | |
sizeof(Params), /* one data item */ | |
MPI_CHAR, /* data item is an array of bytes */ | |
MPI_ANY_SOURCE, /* receive from any sender */ | |
MPI_ANY_TAG, /* any type of message */ | |
MPI_COMM_WORLD, /* default communicator */ | |
&status); /* info about the received message */ | |
if(param.is_error!=0) | |
{ | |
fprintf(stderr,"Error for %s\n",param.filename); | |
} | |
else /* print the result */ | |
{ | |
printf("%s\t%ld\t%ld\t%ld\t%ld\n", | |
param.filename, | |
param.count_total, | |
param.count_mapped, | |
param.count_properly_paired, | |
param.count_dup | |
); | |
} | |
} | |
if(count_sent==0) | |
{ | |
break; | |
} | |
optind+=count_sent; | |
} | |
/* shutdown the slave */ | |
for (rank = 1; rank < proc_count; ++rank) | |
{ | |
Params empty; | |
MPI_Send( | |
&empty, /* message buffer */ | |
sizeof(Params), /* one data item */ | |
MPI_CHAR, /* data item is an array of bytes */ | |
rank, /* destination process rank */ | |
TAG_RELEASE_SLAVE, /* SHutdown */ | |
MPI_COMM_WORLD | |
); /* default communicator */ | |
} | |
} | |
int main(int argc, char *argv[]) | |
{ | |
int rank=0; | |
/* Initialisation */ | |
MPI_Init(&argc, &argv); | |
/* Find out my identity in the default communicator */ | |
MPI_Comm_rank(MPI_COMM_WORLD, &rank); | |
/* rand is zero = I'm the master */ | |
if (rank == 0) | |
{ | |
master(argc-1,&argv[1]); | |
} | |
else /* I'm a slave */ | |
{ | |
slave(argc-1,&argv[1]); | |
} | |
/* Shut down MPI */ | |
MPI_Finalize(); | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment