Created
March 4, 2016 15:28
-
-
Save lindenb/4ae32a75206e9a2d5d1e to your computer and use it in GitHub Desktop.
source code for using java.util.stream with a VCF keywords: ngs vcf VCF stream iteratror variantcontext parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HTSJDK=path/to/htsjdk-2.0.1.jar:path/to/apache-ant-1.8.2-bzip2.jar:path/to/commons-compress-1.4.1.jar:path/to/commons-jexl-2.1.1.jar:path/to/commons-logging-1.1.1.jar:path/to/ngs-java-1.2.2.jar:path/to/snappy-java-1.0.3-rc3.jar:path/to/xz-1.5.jar | |
all: | |
rm -rf tmp | |
mkdir -p tmp | |
javac -d tmp -cp ${HTSJDK}:. Test.java | |
java -cp ${HTSJDK}:tmp Test path/to/dbsnp_138.b37.vcf |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.util.Spliterator; | |
import java.util.stream.Stream; | |
import java.util.stream.StreamSupport; | |
import htsjdk.samtools.util.CloserUtil; | |
import htsjdk.samtools.util.RuntimeIOException; | |
import htsjdk.variant.variantcontext.VariantContext; | |
import htsjdk.samtools.SAMSequenceDictionary; | |
import htsjdk.samtools.SAMSequenceRecord; | |
import htsjdk.samtools.util.CloseableIterator; | |
import htsjdk.samtools.util.CloserUtil; | |
import htsjdk.variant.variantcontext.VariantContext; | |
import htsjdk.variant.vcf.VCFFileReader; | |
public class Test { | |
private static class StreameableVcfFile { | |
protected final File vcfFile; | |
StreameableVcfFile(final File vcfFile) { | |
this.vcfFile = vcfFile; | |
} | |
public Stream<VariantContext> stream() { | |
return stream(false); | |
} | |
public Stream<VariantContext> parallelStream() { | |
return stream(true); | |
} | |
private Stream<VariantContext> stream(boolean parallel) { | |
return StreamSupport.stream(new VariantContextSpliterator(this.vcfFile), parallel); | |
} | |
} | |
public static void main(String[] args) { | |
try { | |
final File vcFile =new File(args[0]); | |
StreameableVcfFile test= new StreameableVcfFile(vcFile); | |
long start1 = System.currentTimeMillis(); | |
System.out.println("count: "+test.parallelStream().count()); | |
long end1 = System.currentTimeMillis(); | |
System.out.println(" parallelstream: " + ((end1 - start1) / 1000)); | |
long start2 = System.currentTimeMillis(); | |
System.out.println("count: "+test.stream().count()); | |
long end2 = System.currentTimeMillis(); | |
System.out.println("stream : " + ((end2 - start2) / 1000)); | |
long start3 = System.currentTimeMillis(); | |
CloseableIterator<VariantContext> r= new VCFFileReader(vcFile).iterator(); | |
int n=0; | |
while(r.hasNext()) { r.next(); ++n;} | |
r.close(); | |
long end3 = System.currentTimeMillis(); | |
System.out.println("count: "+n); | |
System.out.println("iter : " + ((end3 - start3) / 1000)); | |
} catch(Exception err) { | |
err.printStackTrace(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger ; | |
import java.io.Closeable; | |
import java.io.File; | |
import java.util.Spliterator; | |
import java.util.function.Consumer; | |
import htsjdk.samtools.SAMSequenceDictionary; | |
import htsjdk.samtools.SAMSequenceRecord; | |
import htsjdk.samtools.util.CloseableIterator; | |
import htsjdk.samtools.util.CloserUtil; | |
import htsjdk.variant.variantcontext.VariantContext; | |
import htsjdk.variant.vcf.VCFFileReader; | |
public class VariantContextSpliterator implements Closeable,Spliterator<VariantContext> { | |
/** path to the original VCF file */ | |
private final File vcfFile; | |
/** current VCF File reader */ | |
private VCFFileReader vcfFileReader = null; | |
/** genome dictionary */ | |
private final SAMSequenceDictionary dict ; | |
/** current variant iterator */ | |
private CloseableIterator<VariantContext> variantIter = null; | |
/** closed flag */ | |
private boolean is_closed=false; | |
/* region start */ | |
private ContigPos begin; | |
/** region end */ | |
private ContigPos end ; | |
/** next Variant to be used */ | |
private VariantContext _peeked= null; | |
/** number of opened VCF iterators */ | |
private final AtomicInteger nsplits ; | |
private class ContigPos { | |
/** contig/chromosome index in the dictionary */ | |
final int tid; | |
/** contig/chromosome name */ | |
final String contig; | |
/** position in the chromosome */ | |
final int pos; | |
ContigPos(long idx) { | |
long n=0; | |
for(int i=0;i< dict.size();++i) { | |
final SAMSequenceRecord rec=dict.getSequence(i); | |
if( i+1==dict.size() || (n<=idx && idx <= n+ rec.getSequenceLength())) { | |
this.tid = i; | |
this.contig = rec.getSequenceName(); | |
this.pos= (int)(idx-n); | |
return ; | |
} | |
n += rec.getSequenceLength(); | |
} | |
throw new IllegalStateException(); | |
} | |
ContigPos(final String contig,final int pos) { | |
this.contig = contig; | |
this.pos = pos; | |
this.tid = dict.getSequenceIndex(contig); | |
} | |
ContigPos(final int tid,final int pos) { | |
this.contig = dict.getSequence(tid).getSequenceName(); | |
this.pos = pos; | |
this.tid = tid; | |
} | |
long genomicIndex() { | |
long n=0L; | |
for(int i=0;i< this.tid;++i) { | |
n += dict.getSequence(i).getSequenceLength(); | |
} | |
return n+ this.pos; | |
} | |
public String toString() { | |
return contig+"("+tid+")"+pos+"="+genomicIndex(); | |
} | |
} | |
private VariantContextSpliterator( | |
final VariantContextSpliterator parent, | |
final ContigPos start_genomic_index, | |
final ContigPos max_genomic_index | |
) { | |
this.vcfFile = parent.vcfFile; | |
this.dict = parent.dict; | |
this.nsplits = parent.nsplits; | |
this.begin = start_genomic_index; | |
this.end = max_genomic_index; | |
} | |
public VariantContextSpliterator(final File vcfFile) { | |
this.vcfFile = vcfFile; | |
this.dict = VCFFileReader.getSequenceDictionary(vcfFile); | |
this.begin = new ContigPos(this.dict.getSequence(0).getSequenceName(),0); | |
final SAMSequenceRecord rec= this.dict.getSequence(this.dict.size() - 1); | |
this.end = new ContigPos(rec.getSequenceName(),rec.getSequenceLength()+1); | |
this.nsplits = new AtomicInteger(0); | |
} | |
public String toString() { | |
return "start="+begin+" end="+end; | |
} | |
@Override | |
public void close() { | |
if(!is_closed ) { | |
//System.err.println("Close"+toString()); | |
CloserUtil.close(variantIter); | |
variantIter = null; | |
CloserUtil.close(vcfFileReader); | |
vcfFileReader = null; | |
is_closed = true; | |
int n= this.nsplits.decrementAndGet(); | |
} | |
} | |
private VariantContext peek() { | |
if( this.is_closed ) return null; | |
if( _peeked != null) { return _peeked;} | |
/** VCF reader was never opened */ | |
if( this.vcfFileReader == null ) { | |
/** open the VCF reader */ | |
this.vcfFileReader = new VCFFileReader(this.vcfFile, true); | |
/** open a first iterator on the first chromosome */ | |
this.variantIter = this.vcfFileReader.query( | |
this.begin.contig, | |
this.begin.pos, | |
this.dict.getSequence(this.begin.tid).getSequenceLength() /* whole chromosome size */ | |
); | |
/** increase the number of opened streams */ | |
this.nsplits.incrementAndGet(); | |
} | |
/** while there is no more variant available on this chromosome , open the next chromosome for reading */ | |
while(!this.variantIter.hasNext()) { | |
/* close previous iterator */ | |
this.variantIter.close(); | |
this.variantIter = null; | |
if(this.begin.tid == this.end.tid) /* this is the last chromosome */ | |
{ | |
close(); | |
return null; | |
} | |
else | |
{ | |
this.begin = new ContigPos(this.begin.tid+1, 0); | |
this.variantIter = this.vcfFileReader.query( | |
this.begin.contig, | |
this.begin.pos, | |
this.dict.getSequence(this.begin.tid).getSequenceLength() /* whole chromosome size */ | |
); | |
} | |
} | |
/* get the next variant */ | |
final VariantContext ctx = this.variantIter.next(); | |
/* update 'begin' */ | |
this.begin= new ContigPos(ctx.getContig(), ctx.getStart()); | |
/** close if the end of the genomic location was reached */ | |
if((this.begin.tid > this.end.tid) || | |
(this.begin.tid == this.end.tid && this.begin.pos >= this.end.pos) ) { | |
close(); | |
return null; | |
} | |
this._peeked = ctx; | |
return this._peeked; | |
} | |
private VariantContext next() { | |
final VariantContext ctx = this.peek(); | |
if(ctx==null) return null; | |
this._peeked=null; | |
return ctx; | |
} | |
/** If a remaining element exists, performs the given action on it, returning true; else returns false. */ | |
@Override | |
public boolean tryAdvance(Consumer<? super VariantContext> action) { | |
final VariantContext ctx = this.next(); | |
if(ctx==null) { | |
close(); | |
return false; | |
} | |
action.accept(ctx); | |
return true; | |
} | |
@Override | |
public Spliterator<VariantContext> trySplit() { | |
final VariantContext ctx = this.peek(); | |
/** no more variant to read, can't split */ | |
if(ctx==null) return null; | |
/** too many opened VCFFile reader, can't split */ | |
if( this.nsplits.get() > 5) return null; | |
long start = this.begin.genomicIndex(); | |
long distance = this.end.genomicIndex() - start; | |
/** distance between begin and end is greater than 1Mb */ | |
while(distance > 1E6 ) | |
{ | |
distance = distance/2; | |
/** middle genomic index */ | |
final ContigPos mid = new ContigPos(start + distance); | |
/** create a new VariantContextSpliterator starting from mid and ending at this.end */ | |
final VariantContextSpliterator next = new VariantContextSpliterator(this,mid,this.end); | |
if(next.peek()==null) { | |
next.close(); | |
continue; | |
} | |
/* update this.end to 'mid' */ | |
this.end= mid; | |
//System.err.println("split successful:after split "+toString()+" and next="+next); | |
return next; | |
} | |
return null; | |
} | |
@Override | |
public long estimateSize() { | |
return Long.MAX_VALUE; | |
} | |
@Override | |
public int characteristics() { | |
return DISTINCT | NONNULL | IMMUTABLE; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment