Skip to content

Instantly share code, notes, and snippets.

@lindenb
Created March 4, 2016 15:28
Show Gist options
  • Save lindenb/4ae32a75206e9a2d5d1e to your computer and use it in GitHub Desktop.
Save lindenb/4ae32a75206e9a2d5d1e to your computer and use it in GitHub Desktop.
source code for using java.util.stream with a VCF keywords: ngs vcf VCF stream iteratror variantcontext parallel
HTSJDK=path/to/htsjdk-2.0.1.jar:path/to/apache-ant-1.8.2-bzip2.jar:path/to/commons-compress-1.4.1.jar:path/to/commons-jexl-2.1.1.jar:path/to/commons-logging-1.1.1.jar:path/to/ngs-java-1.2.2.jar:path/to/snappy-java-1.0.3-rc3.jar:path/to/xz-1.5.jar
all:
rm -rf tmp
mkdir -p tmp
javac -d tmp -cp ${HTSJDK}:. Test.java
java -cp ${HTSJDK}:tmp Test path/to/dbsnp_138.b37.vcf
import java.io.File;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import htsjdk.samtools.util.CloserUtil;
import htsjdk.samtools.util.RuntimeIOException;
import htsjdk.variant.variantcontext.VariantContext;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.SAMSequenceRecord;
import htsjdk.samtools.util.CloseableIterator;
import htsjdk.samtools.util.CloserUtil;
import htsjdk.variant.variantcontext.VariantContext;
import htsjdk.variant.vcf.VCFFileReader;
public class Test {
private static class StreameableVcfFile {
protected final File vcfFile;
StreameableVcfFile(final File vcfFile) {
this.vcfFile = vcfFile;
}
public Stream<VariantContext> stream() {
return stream(false);
}
public Stream<VariantContext> parallelStream() {
return stream(true);
}
private Stream<VariantContext> stream(boolean parallel) {
return StreamSupport.stream(new VariantContextSpliterator(this.vcfFile), parallel);
}
}
public static void main(String[] args) {
try {
final File vcFile =new File(args[0]);
StreameableVcfFile test= new StreameableVcfFile(vcFile);
long start1 = System.currentTimeMillis();
System.out.println("count: "+test.parallelStream().count());
long end1 = System.currentTimeMillis();
System.out.println(" parallelstream: " + ((end1 - start1) / 1000));
long start2 = System.currentTimeMillis();
System.out.println("count: "+test.stream().count());
long end2 = System.currentTimeMillis();
System.out.println("stream : " + ((end2 - start2) / 1000));
long start3 = System.currentTimeMillis();
CloseableIterator<VariantContext> r= new VCFFileReader(vcFile).iterator();
int n=0;
while(r.hasNext()) { r.next(); ++n;}
r.close();
long end3 = System.currentTimeMillis();
System.out.println("count: "+n);
System.out.println("iter : " + ((end3 - start3) / 1000));
} catch(Exception err) {
err.printStackTrace();
}
}
}
import java.util.concurrent.atomic.AtomicInteger ;
import java.io.Closeable;
import java.io.File;
import java.util.Spliterator;
import java.util.function.Consumer;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.SAMSequenceRecord;
import htsjdk.samtools.util.CloseableIterator;
import htsjdk.samtools.util.CloserUtil;
import htsjdk.variant.variantcontext.VariantContext;
import htsjdk.variant.vcf.VCFFileReader;
public class VariantContextSpliterator implements Closeable,Spliterator<VariantContext> {
/** path to the original VCF file */
private final File vcfFile;
/** current VCF File reader */
private VCFFileReader vcfFileReader = null;
/** genome dictionary */
private final SAMSequenceDictionary dict ;
/** current variant iterator */
private CloseableIterator<VariantContext> variantIter = null;
/** closed flag */
private boolean is_closed=false;
/* region start */
private ContigPos begin;
/** region end */
private ContigPos end ;
/** next Variant to be used */
private VariantContext _peeked= null;
/** number of opened VCF iterators */
private final AtomicInteger nsplits ;
private class ContigPos {
/** contig/chromosome index in the dictionary */
final int tid;
/** contig/chromosome name */
final String contig;
/** position in the chromosome */
final int pos;
ContigPos(long idx) {
long n=0;
for(int i=0;i< dict.size();++i) {
final SAMSequenceRecord rec=dict.getSequence(i);
if( i+1==dict.size() || (n<=idx && idx <= n+ rec.getSequenceLength())) {
this.tid = i;
this.contig = rec.getSequenceName();
this.pos= (int)(idx-n);
return ;
}
n += rec.getSequenceLength();
}
throw new IllegalStateException();
}
ContigPos(final String contig,final int pos) {
this.contig = contig;
this.pos = pos;
this.tid = dict.getSequenceIndex(contig);
}
ContigPos(final int tid,final int pos) {
this.contig = dict.getSequence(tid).getSequenceName();
this.pos = pos;
this.tid = tid;
}
long genomicIndex() {
long n=0L;
for(int i=0;i< this.tid;++i) {
n += dict.getSequence(i).getSequenceLength();
}
return n+ this.pos;
}
public String toString() {
return contig+"("+tid+")"+pos+"="+genomicIndex();
}
}
private VariantContextSpliterator(
final VariantContextSpliterator parent,
final ContigPos start_genomic_index,
final ContigPos max_genomic_index
) {
this.vcfFile = parent.vcfFile;
this.dict = parent.dict;
this.nsplits = parent.nsplits;
this.begin = start_genomic_index;
this.end = max_genomic_index;
}
public VariantContextSpliterator(final File vcfFile) {
this.vcfFile = vcfFile;
this.dict = VCFFileReader.getSequenceDictionary(vcfFile);
this.begin = new ContigPos(this.dict.getSequence(0).getSequenceName(),0);
final SAMSequenceRecord rec= this.dict.getSequence(this.dict.size() - 1);
this.end = new ContigPos(rec.getSequenceName(),rec.getSequenceLength()+1);
this.nsplits = new AtomicInteger(0);
}
public String toString() {
return "start="+begin+" end="+end;
}
@Override
public void close() {
if(!is_closed ) {
//System.err.println("Close"+toString());
CloserUtil.close(variantIter);
variantIter = null;
CloserUtil.close(vcfFileReader);
vcfFileReader = null;
is_closed = true;
int n= this.nsplits.decrementAndGet();
}
}
private VariantContext peek() {
if( this.is_closed ) return null;
if( _peeked != null) { return _peeked;}
/** VCF reader was never opened */
if( this.vcfFileReader == null ) {
/** open the VCF reader */
this.vcfFileReader = new VCFFileReader(this.vcfFile, true);
/** open a first iterator on the first chromosome */
this.variantIter = this.vcfFileReader.query(
this.begin.contig,
this.begin.pos,
this.dict.getSequence(this.begin.tid).getSequenceLength() /* whole chromosome size */
);
/** increase the number of opened streams */
this.nsplits.incrementAndGet();
}
/** while there is no more variant available on this chromosome , open the next chromosome for reading */
while(!this.variantIter.hasNext()) {
/* close previous iterator */
this.variantIter.close();
this.variantIter = null;
if(this.begin.tid == this.end.tid) /* this is the last chromosome */
{
close();
return null;
}
else
{
this.begin = new ContigPos(this.begin.tid+1, 0);
this.variantIter = this.vcfFileReader.query(
this.begin.contig,
this.begin.pos,
this.dict.getSequence(this.begin.tid).getSequenceLength() /* whole chromosome size */
);
}
}
/* get the next variant */
final VariantContext ctx = this.variantIter.next();
/* update 'begin' */
this.begin= new ContigPos(ctx.getContig(), ctx.getStart());
/** close if the end of the genomic location was reached */
if((this.begin.tid > this.end.tid) ||
(this.begin.tid == this.end.tid && this.begin.pos >= this.end.pos) ) {
close();
return null;
}
this._peeked = ctx;
return this._peeked;
}
private VariantContext next() {
final VariantContext ctx = this.peek();
if(ctx==null) return null;
this._peeked=null;
return ctx;
}
/** If a remaining element exists, performs the given action on it, returning true; else returns false. */
@Override
public boolean tryAdvance(Consumer<? super VariantContext> action) {
final VariantContext ctx = this.next();
if(ctx==null) {
close();
return false;
}
action.accept(ctx);
return true;
}
@Override
public Spliterator<VariantContext> trySplit() {
final VariantContext ctx = this.peek();
/** no more variant to read, can't split */
if(ctx==null) return null;
/** too many opened VCFFile reader, can't split */
if( this.nsplits.get() > 5) return null;
long start = this.begin.genomicIndex();
long distance = this.end.genomicIndex() - start;
/** distance between begin and end is greater than 1Mb */
while(distance > 1E6 )
{
distance = distance/2;
/** middle genomic index */
final ContigPos mid = new ContigPos(start + distance);
/** create a new VariantContextSpliterator starting from mid and ending at this.end */
final VariantContextSpliterator next = new VariantContextSpliterator(this,mid,this.end);
if(next.peek()==null) {
next.close();
continue;
}
/* update this.end to 'mid' */
this.end= mid;
//System.err.println("split successful:after split "+toString()+" and next="+next);
return next;
}
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return DISTINCT | NONNULL | IMMUTABLE;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment