Skip to content

Instantly share code, notes, and snippets.

@ipurusho
Last active August 29, 2015 14:22
Show Gist options
  • Save ipurusho/c2111cbe329b72a2320e to your computer and use it in GitHub Desktop.
Save ipurusho/c2111cbe329b72a2320e to your computer and use it in GitHub Desktop.
calculate coverage of features in a Feature RDD using binning
def featureCoverage(reference: RDD[Feature],reads: RDD[AlignmentRecord],bins:Int): RDD[(String, Iterable[Double])] = {
val getBinsForward = for{
feature <- reference
bin = bins
interval = ((((feature.getEnd.toDouble-feature.getStart.toDouble)/bin).toDouble).ceil).toInt
strand = feature.getFeatureType.toString
start = feature.getStart.toInt
end = feature.getEnd.toInt
refName = feature.getContig.getContigName.toString
identifier = feature.getFeatureId.toString
i <- 0 until bin if strand == "+"
newStart = start + i*interval
newEnd = (newStart + interval)
newFeature = Feature.newBuilder.setStart(newStart).setEnd(newEnd).setFeatureId(identifier+"."+i).build
binnedRefRegionForward = (ReferenceRegion(refName, newStart,newEnd),newFeature)
} yield binnedRefRegionForward
val getBinsReverse = for{
feature <- reference
bin = bins
interval = ((((feature.getEnd.toDouble-feature.getStart.toDouble)/bin).toDouble).ceil).toInt
strand = feature.getFeatureType
start = feature.getStart.toInt
end = feature.getEnd.toInt
refName = feature.getContig.getContigName.toString
identifier = feature.getFeatureId.toString
i <- bin-1 to 0 by -1 if strand == "-"
newEnd = (end - i*interval)
newStart = newEnd - interval
newFeature = Feature.newBuilder.setStart(newStart).setEnd(newEnd).setFeatureId(identifier+"."+i).build
binnedRefRegionReverse = (ReferenceRegion(refName, newStart,newEnd),newFeature)
} yield binnedRefRegionReverse
val getBins = getBinsForward.union(getBinsReverse)
val rMap = reads.map(r => ReferenceRegion(r) -> r)
val r = BroadcastRegionJoin.partitionAndJoin(getBins, rMap)
val numReadsPerFeature = r.map(_._1 -> 1.toDouble).reduceByKey(_ + _)
val featureCov = for(cov <- numReadsPerFeature) yield ((cov._1.getFeatureId.split('.')(0),cov._1.getFeatureId.split('.')(1)),cov._2.toDouble)
val nullCov = for{
availFeatures <- featureCov
i <- 0 until bins} yield ((availFeatures._1._1,i.toString),0.toDouble)
val allCov = featureCov.union(nullCov.reduceByKey(_+_)).reduceByKey(_+_)
val featureCovArray = allCov.sortBy(_._1).groupBy(_._1._1).mapValues(_.map(_._2))
val emptyCov = for{i <- 1 to reference.count().toInt-featureCovArray.count().toInt} yield (java.util.UUID.randomUUID.toString,Iterable.fill(bins)(0.toDouble))
featureCovArray.union(sc.parallelize(emptyCov))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment