Skip to content

Instantly share code, notes, and snippets.

@lindenb
Created February 11, 2013 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lindenb/4754583 to your computer and use it in GitHub Desktop.
Save lindenb/4754583 to your computer and use it in GitHub Desktop.
My first Open-MPI program: reads the reads in one or more BAM files
SAMDIR=/commun/data/packages/samtools-0.1.18
OPENMPI.dir=/commun/data/packages/openmpi
CC=$(OPENMPI.dir)/bin/ortecc
.PHONY: all
all: ./ompibam
./ompibam: ompibam.c
$(CC) -o $@ -O3 -I$(SAMDIR) \
-Wl,-rpath -Wl,$(OPENMPI.dir)/lib \
-L$(SAMDIR) -Wall $< -lmpi -lbam -lz
/**
* Author : Pierre Lindenbaum PhD
* Date : 2013
* WWW: http://plindenbaum.blogspot.com
* Motivation:
* my first Open-MPI source. Takes a list of BAM files
* and count the number of reads.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "sam.h"
/** message send to the node: "count the read in a bam" */
#define TAG_COUNT_BAM 1
/** message send to the node: "shutdown" */
#define TAG_RELEASE_SLAVE 2
/**
* fixed-size structure send to the nodes. contains the BAMs and the counts of reads
*/
typedef struct Params_t
{
char filename[FILENAME_MAX];
int is_error;
long count_total;
long count_mapped;
long count_properly_paired;
long count_dup;
}Params;
#define IS_FLAG_SET(b,f) ((b->core.flag & (f))!=0)
/**
* count the reads in a BAM file using the SAMTOOLS API
* fills the structure Params*
*/
static void count_reads(Params* p)
{
int ret;
bam1_t* b;
bamFile fp=bam_open(p->filename, "r") ;
bam_header_t *header;
if(fp==0)
{
fprintf(stderr,"Cannot open %s\n",p->filename);
p->is_error=1;
return;
}
header= bam_header_read(fp);
b=bam_init1();
while ((ret = bam_read1(fp, b)) >= 0)
{
p->count_total++;
if(IS_FLAG_SET(b,BAM_FUNMAP)) continue;
p->count_mapped++;
if(IS_FLAG_SET(b,BAM_FPROPER_PAIR))
{
p->count_properly_paired++;
}
if(IS_FLAG_SET(b,BAM_FDUP))
{
p->count_dup++;
}
}
bam_destroy1(b);
bam_header_destroy(header);
bam_close(fp);
}
/**
* This is the slave method, it receive a message 'Params' from the master method
* if the status is TAG_RELEASE_SLAVE the slave is released
* else the bam in param.filename is analyszed and the message is sent back to the master
*/
static void slave()
{
for(;;)
{
MPI_Status status;
Params param;
/* Receive a message from the master */
MPI_Recv(
&param,
sizeof(Params),
MPI_CHAR,
0,
MPI_ANY_TAG,
MPI_COMM_WORLD,
&status);
if(status.MPI_TAG==TAG_RELEASE_SLAVE)
{
break;
}
count_reads(&param);
/* Send the result back */
MPI_Send(
&param,
sizeof(Params),
MPI_CHAR,
0,
0,
MPI_COMM_WORLD
);
}
}
/**
* this is the master method, while there is
* a bam in the queue, send the analysis to the slaves.
* At the end, close the slaves
*/
static void master(const int n_bams,char** argv)
{
int rank=0;
int optind=0;
int proc_count=0;
/* get number of available processor proc_count */
if(MPI_Comm_size(MPI_COMM_WORLD, &proc_count)!=MPI_SUCCESS )
{
return;
}
printf("#filename\ttotal\tmapped\tproperly_paired\tdup\n");
/* loop over the bam files */
while(optind< n_bams)
{
int i=0;
/* number of message sent */
int count_sent=0;
/* loop over the available procs */
for (rank = 1;
rank <proc_count && optind+count_sent < n_bams;
++rank)
{
/* create a new param */
Params param;
/** fill to '0' */
memset((void*)&param,0,sizeof(Params));
/* set the BAM filename */
strcpy(param.filename,argv[count_sent+optind]);
/* Send it to each rank */
MPI_Send(
&param, /* message buffer */
sizeof(Params), /* one data item */
MPI_CHAR, /* data item is an array of bytes */
rank, /* destination process rank */
TAG_COUNT_BAM, /* user chosen message tag */
MPI_COMM_WORLD /* default communicator */
);
++count_sent;
}
/* now we get the results */
for(i=0;i< count_sent;++i)
{
/* get a message from the slave */
Params param;
MPI_Status status;
MPI_Recv(&param, /* message buffer */
sizeof(Params), /* one data item */
MPI_CHAR, /* data item is an array of bytes */
MPI_ANY_SOURCE, /* receive from any sender */
MPI_ANY_TAG, /* any type of message */
MPI_COMM_WORLD, /* default communicator */
&status); /* info about the received message */
if(param.is_error!=0)
{
fprintf(stderr,"Error for %s\n",param.filename);
}
else /* print the result */
{
printf("%s\t%ld\t%ld\t%ld\t%ld\n",
param.filename,
param.count_total,
param.count_mapped,
param.count_properly_paired,
param.count_dup
);
}
}
if(count_sent==0)
{
break;
}
optind+=count_sent;
}
/* shutdown the slave */
for (rank = 1; rank < proc_count; ++rank)
{
Params empty;
MPI_Send(
&empty, /* message buffer */
sizeof(Params), /* one data item */
MPI_CHAR, /* data item is an array of bytes */
rank, /* destination process rank */
TAG_RELEASE_SLAVE, /* SHutdown */
MPI_COMM_WORLD
); /* default communicator */
}
}
int main(int argc, char *argv[])
{
int rank=0;
/* Initialisation */
MPI_Init(&argc, &argv);
/* Find out my identity in the default communicator */
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
/* rand is zero = I'm the master */
if (rank == 0)
{
master(argc-1,&argv[1]);
}
else /* I'm a slave */
{
slave(argc-1,&argv[1]);
}
/* Shut down MPI */
MPI_Finalize();
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment