Skip to content

Instantly share code, notes, and snippets.

@brentp
Last active August 29, 2015 14:10
Show Gist options
  • Save brentp/18874051f2d2b09b6c06 to your computer and use it in GitHub Desktop.
Save brentp/18874051f2d2b09b6c06 to your computer and use it in GitHub Desktop.
Streaming relation (overlap, distance, KNN) testing of (any number of) sorted files of intervals.
package main
import (
"bufio"
"compress/gzip"
"container/heap"
"fmt"
"io"
"os"
"strconv"
"strings"
)
type Interface interface {
Chrom() string
Start() uint32
End() uint32
Related() []*Interface
AddRelated(*Interface)
Source() *uint32
String() string
}
// example satisfying interface
type Interval struct {
chrom string
start uint32
end uint32
line string
_source uint32 // internal
related []*Interface
}
func (i *Interval) Chrom() string { return i.chrom }
func (i *Interval) Start() uint32 { return i.start }
func (i *Interval) End() uint32 { return i.end }
func (i *Interval) Related() []*Interface { return i.related }
func (i *Interval) AddRelated(b *Interface) { i.related = append(i.related, b) }
func (i *Interval) Source() *uint32 { return &i._source }
func (i *Interval) String() string {
if i._source == 0 && len(i.related) > 0 {
//print(*(*next_interval).Source())
return i.line
}
return ""
}
type Q []*Interface
func (q Q) Len() int { return len(q) }
func (q Q) Less(i, j int) bool {
if (*q[i]).Chrom() == (*q[j]).Chrom() {
return (*q[i]).Start() < (*q[j]).Start()
}
return (*q[i]).Chrom() < (*q[j]).Chrom()
}
func (q Q) Swap(i, j int) {
if i < len(q) {
q[j], q[i] = q[i], q[j]
}
}
func (q *Q) Push(i interface{}) {
iv := i.(*Interface)
*q = append(*q, iv)
}
func (q *Q) Pop() interface{} {
n := len(*q)
if n == 0 {
return nil
}
old := *q
iv := old[n-1]
*q = old[0 : n-1]
return iv
}
func relate(a *Interface, b *Interface) {
(*a).AddRelated(b)
(*b).AddRelated(a)
}
// example function to check for overlap or check within distance
func check_related_distance(a *Interface, b *Interface) bool {
distance := uint32(0)
// note with distance == 0 this just overlap.
return (*b).Chrom() == (*a).Chrom() && (*b).Start()-distance < (*a).End()
}
// https://gist.github.com/soniakeys/2433004
// http://stackoverflow.com/questions/18827800/init-of-slice-in-struct
// https://gist.github.com/samuell/5591369
func Filter(s []*Interface) []*Interface {
p := make([]*Interface, len(s))
j := 0
for _, v := range s {
if v != nil {
p[j] = v
j += 1
}
}
return p[:j]
}
func IRelate(stream IntervalChannel,
check_related func(a *Interface, b *Interface) bool) {
cache := make([]*Interface, 0, 128)
cache = append(cache, <-stream)
print := func(i *Interface) {
s := (*i).String()
if s != "" {
fmt.Println(s)
}
}
nils := 0
for interval := range stream {
for i, c := range cache {
if c == nil {
continue
}
if check_related(c, interval) {
relate(c, interval)
} else {
print(c)
cache[i] = nil
nils += 1
}
}
// see: https://github.com/golang/go/wiki/SliceTricks
if nils > 10 {
cache = Filter(cache)
nils = 0
}
cache = append(cache, interval)
}
for _, c := range Filter(cache) {
print(c)
}
}
type IntervalChannel <-chan *Interface
func Merge(streams ...IntervalChannel) IntervalChannel {
q := make(Q, 0, len(streams))
for i, stream := range streams {
interval := <-stream
if interval != nil {
*(*interval).Source() = uint32(i)
heap.Push(&q, interval)
}
}
ch := make(chan *Interface, 16)
go func() {
for iv := heap.Pop(&q); len(q) > 0; iv = heap.Pop(&q) {
interval := iv.(*Interface)
source := *(*interval).Source()
next_interval := <-streams[source]
if next_interval != nil {
*(*next_interval).Source() = source
heap.Push(&q, next_interval)
//print(*(*next_interval).Source())
}
ch <- interval
}
close(ch)
}()
return ch
}
func IntervalFromBedLine(line string) *Interval {
fields := strings.SplitN(line, "\t", 4)
start, err := strconv.ParseUint(fields[1], 10, 32)
if err != nil {
panic("start")
}
end, err := strconv.ParseUint(fields[2], 10, 32)
if err != nil {
panic("end")
}
i := Interval{chrom: fields[0], start: uint32(start), end: uint32(end), line: line}
i.related = make([]*Interface, 0, 16)
return &i
}
func OpenScanFile(file string) (*bufio.Scanner, io.ReadCloser, error) {
var f io.ReadCloser
f, err := os.Open(file)
if err != nil {
return nil, f, err
}
if strings.HasSuffix(file, ".gz") {
f, err = gzip.NewReader(f)
}
scanner := bufio.NewScanner(f)
scanner.Split(bufio.ScanLines)
return scanner, f, nil
}
func IterateBed(file string) IntervalChannel {
scanner, f, err := OpenScanFile(file)
if err != nil {
panic(err)
}
ch := make(chan *Interface, 32)
go func() {
defer f.Close()
for scanner.Scan() {
line := scanner.Text()
var i Interface
i = IntervalFromBedLine(line)
ch <- &i //IntervalFromBedLine(line).(Interface)
}
// returning nil as a sentinel so the Merge won't block
ch <- nil
close(ch)
}()
return ch
}
func main() {
anext := IterateBed("/usr/local/src/poverlap/data/replication_timing.hg19.bed.gz")
bnext := IterateBed("/usr/local/src/poverlap/data/hg19.gene-features.bed")
merged := Merge(anext, bnext)
var i Interface
// http://stackoverflow.com/questions/4799905/casting-back-to-more-specialised-interface
i = &Interval{chrom: "chr3"}
i = i
/*
f, err := os.Create("irelate.pprof")
if err != nil {
panic("bad")
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
*/
IRelate(merged, check_related_distance)
}
"""Streaming relation (overlap, distance, KNN) testing of (any number of) sorted files of intervals."""
from __future__ import print_function
import sys
import gzip
import heapq
import itertools as it
from operator import attrgetter
from collections import namedtuple
try:
filter = it.ifilter
except AttributeError: # python3
pass
########################################################################
########################################################################
# this section is mostly uninteresting code to:
# 1. "parse" the intervals
# 2. sort them into a single iterable across files (using heapq.merge in python).
# 3. group them by chrom (using itertools.groupby)
# this is all done lazily streaming over the intervals.
########################################################################
########################################################################
def xopen(f):
return gzip.open(f) if f.endswith('.gz') \
else sys.stdin if f == "-" \
else open(f)
class BedIter(object):
__slots__ = "fh chrom start end header".split()
def __init__(self, fname, chrom=0, start=1, end=2):
self.fh = xopen(fname)
self.chrom, self.start, self.end = chrom, start, end
self.header = None
def __iter__(self):
return self
def next(self):
line = next(self.fh).split("\t")
chrom, start = line[self.chrom], int(line[self.start])
# yield chrom and start so that heapq.merge works
return Interval(chrom, start, int(line[self.end]), self.fh, line, [])
__next__ = next
def merge(*beds):
beds = [BedIter(f) for f in beds]
for item in heapq.merge(*beds):
yield item
def relate(merged):
iterable = it.groupby(merged, attrgetter('chrom'))
for chrom, li in iterable:
# we know everything in li is from the same chrom
for interval in irelate(li):
yield interval
############################################################################
############################################################################
# The section below is more interesting with the `irelate` function taking
# an iterable of intervals and simply sending the appropriate intervals to
# `check_related` and then to `report`. Each time `check_related` returns
# True, info about the related interval is added to the other so the relations
# can then be accessed in `report`
# related will be a list of all things that are related to the given interval
Interval = namedtuple('Interval', 'chrom start end fh line related'.split())
# example function to check for overlap or check within distance
def check_related_distance(a, b, distance=0):
# note with distance == 0 this just overlap.
assert a.start <= b.start and a.chrom == b.chrom
return b.start - distance < a.end
# example function to get k-nearest neighbors
def check_related_knn(a, b, n=3):
# the first n checked would be the n_closest, but need to consider ties
# the report function can decide what to do with them.
assert a.start <= b.start and a.chrom == b.chrom
if len(a.related) >= n:
# TODO: fix this.
if a.related[-1].start - a.end < b.start - a.end:
return False
else:
return True
# example function to report an interval
def report(interval, fname=sys.argv[1]):
# can get all information about related intervals with interval.related.
if len(interval.related) == 0: return None
# all intervals are sent here, only report those from a.bed
if interval.fh.name != fname: return None
return "{i.chrom}\t{i.start}\t{i.end}\t{i.fh.name}\t{l}".format(i=interval, l=len(interval.related))
def irelate(interval_iter, check_related=check_related_distance, report=report, include_self_overlaps=False):
"""
Flexible overlap, KNN or proximity testing and reporting.
Arguments
---------
interval_iter: iterable
a lazy iterable of Intervals sorted by start position and from the same
chromosome.
check_related: function(a Interval, b Interval) -> bool
a function that accepts 2 intervals and tells if they are related.
Since the input is assumed to be sorted, it is assumed that if the
function returns false, the a Interval can not possibly be related to
any interval after the b Interval and so it can be yielded (and not
tested against any remaining intervals in the stream).
See check_related_distance and check_related_knn for example.
report: function(i Interval) -> string
what to report from the interval. Intervals that it overlaps are
available in i.related
"""
cache = [next(interval_iter)]
for interval in interval_iter:
for i, c in enumerate(cache):
if check_related(c, interval):
# they must overlap. add each to the other's related pile
if c.fh != interval.fh or include_self_overlaps:
c.related.append(interval)
interval.related.append(c)
else:
yield report(c)
cache[i] = None
cache = list(filter(None, cache)) + [interval]
for c in cache:
yield report(c)
if __name__ == "__main__":
import sys
for b in relate(merge(*sys.argv[1:])):
if b is None: continue
print(b)
package main
import (
"container/heap"
"fmt"
"testing"
)
func TestInterval(t *testing.T) {
a := Interval{chrom: "chr1", start: 1234, end: 5678,
line: "chr1\t1234\t5678"}
if a.chrom != "chr1" {
t.Error("expected \"chr1\", got", a.chrom)
}
if a.start != 1234 {
t.Error("expected start = 1234, got", a.start)
}
if a.end != 5678 {
t.Error("expected start = 5678, got", a.end)
}
s := fmt.Sprintf("%s", a)
if len(s) == 0 {
t.Error("bad String")
}
}
func TestRelate(t *testing.T) {
a := Interval{chrom: "chr1", start: 1234, end: 5678,
line: "chr1\t1234\t5678"}
b := &Interval{chrom: "chr1", start: 9234, end: 9678,
line: "chr1\t9234\t9678"}
if len(a.related) != 0 {
t.Error("a.related should be empty")
}
if len(b.related) != 0 {
t.Error("b.related should be empty")
}
relate(&a, b)
if len(a.related) != 1 {
t.Error("a.related should have 1 interval")
}
if len(b.related) != 1 {
t.Error("b.related should have 1 interval")
}
if a.related[0] != b {
t.Error("a.related[0] should be b")
}
}
func TestQ(t *testing.T) {
a := &Interval{chrom: "chr1", start: 1234, end: 5678,
line: "chr1\t1234\t5678"}
b := &Interval{chrom: "chr1", start: 9234, end: 9678,
line: "chr1\t9234\t9678"}
c := &Interval{chrom: "chr2", start: 9234, end: 9678,
line: "chr1\t9234\t9678"}
q := make(Q, 0)
heap.Init(&q)
heap.Push(&q, b)
heap.Push(&q, a)
heap.Push(&q, c)
first := heap.Pop(&q)
if first != a {
t.Error("first interval off q should be a")
}
second := heap.Pop(&q)
if second != b {
t.Error("2nd interval off q should be b")
}
third := heap.Pop(&q)
if third != c {
t.Error("third interval off q should be c")
}
if heap.Pop(&q) != nil {
t.Error("empty q should return nil")
}
}
func TestMerge(t *testing.T) {
a := &Interval{chrom: "chr1", start: 1234, end: 5678,
line: "chr1\t1234\t5678"}
b := &Interval{chrom: "chr1", start: 9234, end: 9678,
line: "chr1\t9234\t9678"}
c := &Interval{chrom: "chr2", start: 9234, end: 9678,
line: "chr1\t9234\t9678"}
ai := 0
bi := 0
ci := 0
nexta := func() *Interval {
if ai == 0 {
ai = 1
return a
}
return nil
}
nextb := func() *Interval {
if bi == 0 {
bi = 1
return b
}
return nil
}
nextc := func() *Interval {
if ci < 2 {
ci = ci + 1
return c
}
return nil
}
merged := Merge(nexta, nextb, nextc)
first := merged()
if first != a {
t.Error("first interval off merge should be a")
}
second := merged()
if second != b {
t.Error("2nd interval off merge should be b")
}
third := merged()
if third != c {
t.Error("third interval off merge should be c")
}
fourth := merged()
if fourth != c {
t.Error("fourth interval off merge should be c")
}
if merged() != nil {
t.Error("empty Merge should return nil")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment