Last active
August 29, 2015 14:10
-
-
Save brentp/18874051f2d2b09b6c06 to your computer and use it in GitHub Desktop.
Streaming relation (overlap, distance, KNN) testing of (any number of) sorted files of intervals.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"compress/gzip" | |
"container/heap" | |
"fmt" | |
"io" | |
"os" | |
"strconv" | |
"strings" | |
) | |
type Interface interface { | |
Chrom() string | |
Start() uint32 | |
End() uint32 | |
Related() []*Interface | |
AddRelated(*Interface) | |
Source() *uint32 | |
String() string | |
} | |
// example satisfying interface | |
type Interval struct { | |
chrom string | |
start uint32 | |
end uint32 | |
line string | |
_source uint32 // internal | |
related []*Interface | |
} | |
func (i *Interval) Chrom() string { return i.chrom } | |
func (i *Interval) Start() uint32 { return i.start } | |
func (i *Interval) End() uint32 { return i.end } | |
func (i *Interval) Related() []*Interface { return i.related } | |
func (i *Interval) AddRelated(b *Interface) { i.related = append(i.related, b) } | |
func (i *Interval) Source() *uint32 { return &i._source } | |
func (i *Interval) String() string { | |
if i._source == 0 && len(i.related) > 0 { | |
//print(*(*next_interval).Source()) | |
return i.line | |
} | |
return "" | |
} | |
type Q []*Interface | |
func (q Q) Len() int { return len(q) } | |
func (q Q) Less(i, j int) bool { | |
if (*q[i]).Chrom() == (*q[j]).Chrom() { | |
return (*q[i]).Start() < (*q[j]).Start() | |
} | |
return (*q[i]).Chrom() < (*q[j]).Chrom() | |
} | |
func (q Q) Swap(i, j int) { | |
if i < len(q) { | |
q[j], q[i] = q[i], q[j] | |
} | |
} | |
func (q *Q) Push(i interface{}) { | |
iv := i.(*Interface) | |
*q = append(*q, iv) | |
} | |
func (q *Q) Pop() interface{} { | |
n := len(*q) | |
if n == 0 { | |
return nil | |
} | |
old := *q | |
iv := old[n-1] | |
*q = old[0 : n-1] | |
return iv | |
} | |
func relate(a *Interface, b *Interface) { | |
(*a).AddRelated(b) | |
(*b).AddRelated(a) | |
} | |
// example function to check for overlap or check within distance | |
func check_related_distance(a *Interface, b *Interface) bool { | |
distance := uint32(0) | |
// note with distance == 0 this just overlap. | |
return (*b).Chrom() == (*a).Chrom() && (*b).Start()-distance < (*a).End() | |
} | |
// https://gist.github.com/soniakeys/2433004 | |
// http://stackoverflow.com/questions/18827800/init-of-slice-in-struct | |
// https://gist.github.com/samuell/5591369 | |
func Filter(s []*Interface) []*Interface { | |
p := make([]*Interface, len(s)) | |
j := 0 | |
for _, v := range s { | |
if v != nil { | |
p[j] = v | |
j += 1 | |
} | |
} | |
return p[:j] | |
} | |
func IRelate(stream IntervalChannel, | |
check_related func(a *Interface, b *Interface) bool) { | |
cache := make([]*Interface, 0, 128) | |
cache = append(cache, <-stream) | |
print := func(i *Interface) { | |
s := (*i).String() | |
if s != "" { | |
fmt.Println(s) | |
} | |
} | |
nils := 0 | |
for interval := range stream { | |
for i, c := range cache { | |
if c == nil { | |
continue | |
} | |
if check_related(c, interval) { | |
relate(c, interval) | |
} else { | |
print(c) | |
cache[i] = nil | |
nils += 1 | |
} | |
} | |
// see: https://github.com/golang/go/wiki/SliceTricks | |
if nils > 10 { | |
cache = Filter(cache) | |
nils = 0 | |
} | |
cache = append(cache, interval) | |
} | |
for _, c := range Filter(cache) { | |
print(c) | |
} | |
} | |
type IntervalChannel <-chan *Interface | |
func Merge(streams ...IntervalChannel) IntervalChannel { | |
q := make(Q, 0, len(streams)) | |
for i, stream := range streams { | |
interval := <-stream | |
if interval != nil { | |
*(*interval).Source() = uint32(i) | |
heap.Push(&q, interval) | |
} | |
} | |
ch := make(chan *Interface, 16) | |
go func() { | |
for iv := heap.Pop(&q); len(q) > 0; iv = heap.Pop(&q) { | |
interval := iv.(*Interface) | |
source := *(*interval).Source() | |
next_interval := <-streams[source] | |
if next_interval != nil { | |
*(*next_interval).Source() = source | |
heap.Push(&q, next_interval) | |
//print(*(*next_interval).Source()) | |
} | |
ch <- interval | |
} | |
close(ch) | |
}() | |
return ch | |
} | |
func IntervalFromBedLine(line string) *Interval { | |
fields := strings.SplitN(line, "\t", 4) | |
start, err := strconv.ParseUint(fields[1], 10, 32) | |
if err != nil { | |
panic("start") | |
} | |
end, err := strconv.ParseUint(fields[2], 10, 32) | |
if err != nil { | |
panic("end") | |
} | |
i := Interval{chrom: fields[0], start: uint32(start), end: uint32(end), line: line} | |
i.related = make([]*Interface, 0, 16) | |
return &i | |
} | |
func OpenScanFile(file string) (*bufio.Scanner, io.ReadCloser, error) { | |
var f io.ReadCloser | |
f, err := os.Open(file) | |
if err != nil { | |
return nil, f, err | |
} | |
if strings.HasSuffix(file, ".gz") { | |
f, err = gzip.NewReader(f) | |
} | |
scanner := bufio.NewScanner(f) | |
scanner.Split(bufio.ScanLines) | |
return scanner, f, nil | |
} | |
func IterateBed(file string) IntervalChannel { | |
scanner, f, err := OpenScanFile(file) | |
if err != nil { | |
panic(err) | |
} | |
ch := make(chan *Interface, 32) | |
go func() { | |
defer f.Close() | |
for scanner.Scan() { | |
line := scanner.Text() | |
var i Interface | |
i = IntervalFromBedLine(line) | |
ch <- &i //IntervalFromBedLine(line).(Interface) | |
} | |
// returning nil as a sentinel so the Merge won't block | |
ch <- nil | |
close(ch) | |
}() | |
return ch | |
} | |
func main() { | |
anext := IterateBed("/usr/local/src/poverlap/data/replication_timing.hg19.bed.gz") | |
bnext := IterateBed("/usr/local/src/poverlap/data/hg19.gene-features.bed") | |
merged := Merge(anext, bnext) | |
var i Interface | |
// http://stackoverflow.com/questions/4799905/casting-back-to-more-specialised-interface | |
i = &Interval{chrom: "chr3"} | |
i = i | |
/* | |
f, err := os.Create("irelate.pprof") | |
if err != nil { | |
panic("bad") | |
} | |
pprof.StartCPUProfile(f) | |
defer pprof.StopCPUProfile() | |
*/ | |
IRelate(merged, check_related_distance) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Streaming relation (overlap, distance, KNN) testing of (any number of) sorted files of intervals.""" | |
from __future__ import print_function | |
import sys | |
import gzip | |
import heapq | |
import itertools as it | |
from operator import attrgetter | |
from collections import namedtuple | |
try: | |
filter = it.ifilter | |
except AttributeError: # python3 | |
pass | |
######################################################################## | |
######################################################################## | |
# this section is mostly uninteresting code to: | |
# 1. "parse" the intervals | |
# 2. sort them into a single iterable across files (using heapq.merge in python). | |
# 3. group them by chrom (using itertools.groupby) | |
# this is all done lazily streaming over the intervals. | |
######################################################################## | |
######################################################################## | |
def xopen(f): | |
return gzip.open(f) if f.endswith('.gz') \ | |
else sys.stdin if f == "-" \ | |
else open(f) | |
class BedIter(object): | |
__slots__ = "fh chrom start end header".split() | |
def __init__(self, fname, chrom=0, start=1, end=2): | |
self.fh = xopen(fname) | |
self.chrom, self.start, self.end = chrom, start, end | |
self.header = None | |
def __iter__(self): | |
return self | |
def next(self): | |
line = next(self.fh).split("\t") | |
chrom, start = line[self.chrom], int(line[self.start]) | |
# yield chrom and start so that heapq.merge works | |
return Interval(chrom, start, int(line[self.end]), self.fh, line, []) | |
__next__ = next | |
def merge(*beds): | |
beds = [BedIter(f) for f in beds] | |
for item in heapq.merge(*beds): | |
yield item | |
def relate(merged): | |
iterable = it.groupby(merged, attrgetter('chrom')) | |
for chrom, li in iterable: | |
# we know everything in li is from the same chrom | |
for interval in irelate(li): | |
yield interval | |
############################################################################ | |
############################################################################ | |
# The section below is more interesting with the `irelate` function taking | |
# an iterable of intervals and simply sending the appropriate intervals to | |
# `check_related` and then to `report`. Each time `check_related` returns | |
# True, info about the related interval is added to the other so the relations | |
# can then be accessed in `report` | |
# related will be a list of all things that are related to the given interval | |
Interval = namedtuple('Interval', 'chrom start end fh line related'.split()) | |
# example function to check for overlap or check within distance | |
def check_related_distance(a, b, distance=0): | |
# note with distance == 0 this just overlap. | |
assert a.start <= b.start and a.chrom == b.chrom | |
return b.start - distance < a.end | |
# example function to get k-nearest neighbors | |
def check_related_knn(a, b, n=3): | |
# the first n checked would be the n_closest, but need to consider ties | |
# the report function can decide what to do with them. | |
assert a.start <= b.start and a.chrom == b.chrom | |
if len(a.related) >= n: | |
# TODO: fix this. | |
if a.related[-1].start - a.end < b.start - a.end: | |
return False | |
else: | |
return True | |
# example function to report an interval | |
def report(interval, fname=sys.argv[1]): | |
# can get all information about related intervals with interval.related. | |
if len(interval.related) == 0: return None | |
# all intervals are sent here, only report those from a.bed | |
if interval.fh.name != fname: return None | |
return "{i.chrom}\t{i.start}\t{i.end}\t{i.fh.name}\t{l}".format(i=interval, l=len(interval.related)) | |
def irelate(interval_iter, check_related=check_related_distance, report=report, include_self_overlaps=False): | |
""" | |
Flexible overlap, KNN or proximity testing and reporting. | |
Arguments | |
--------- | |
interval_iter: iterable | |
a lazy iterable of Intervals sorted by start position and from the same | |
chromosome. | |
check_related: function(a Interval, b Interval) -> bool | |
a function that accepts 2 intervals and tells if they are related. | |
Since the input is assumed to be sorted, it is assumed that if the | |
function returns false, the a Interval can not possibly be related to | |
any interval after the b Interval and so it can be yielded (and not | |
tested against any remaining intervals in the stream). | |
See check_related_distance and check_related_knn for example. | |
report: function(i Interval) -> string | |
what to report from the interval. Intervals that it overlaps are | |
available in i.related | |
""" | |
cache = [next(interval_iter)] | |
for interval in interval_iter: | |
for i, c in enumerate(cache): | |
if check_related(c, interval): | |
# they must overlap. add each to the other's related pile | |
if c.fh != interval.fh or include_self_overlaps: | |
c.related.append(interval) | |
interval.related.append(c) | |
else: | |
yield report(c) | |
cache[i] = None | |
cache = list(filter(None, cache)) + [interval] | |
for c in cache: | |
yield report(c) | |
if __name__ == "__main__": | |
import sys | |
for b in relate(merge(*sys.argv[1:])): | |
if b is None: continue | |
print(b) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"container/heap" | |
"fmt" | |
"testing" | |
) | |
func TestInterval(t *testing.T) { | |
a := Interval{chrom: "chr1", start: 1234, end: 5678, | |
line: "chr1\t1234\t5678"} | |
if a.chrom != "chr1" { | |
t.Error("expected \"chr1\", got", a.chrom) | |
} | |
if a.start != 1234 { | |
t.Error("expected start = 1234, got", a.start) | |
} | |
if a.end != 5678 { | |
t.Error("expected start = 5678, got", a.end) | |
} | |
s := fmt.Sprintf("%s", a) | |
if len(s) == 0 { | |
t.Error("bad String") | |
} | |
} | |
func TestRelate(t *testing.T) { | |
a := Interval{chrom: "chr1", start: 1234, end: 5678, | |
line: "chr1\t1234\t5678"} | |
b := &Interval{chrom: "chr1", start: 9234, end: 9678, | |
line: "chr1\t9234\t9678"} | |
if len(a.related) != 0 { | |
t.Error("a.related should be empty") | |
} | |
if len(b.related) != 0 { | |
t.Error("b.related should be empty") | |
} | |
relate(&a, b) | |
if len(a.related) != 1 { | |
t.Error("a.related should have 1 interval") | |
} | |
if len(b.related) != 1 { | |
t.Error("b.related should have 1 interval") | |
} | |
if a.related[0] != b { | |
t.Error("a.related[0] should be b") | |
} | |
} | |
func TestQ(t *testing.T) { | |
a := &Interval{chrom: "chr1", start: 1234, end: 5678, | |
line: "chr1\t1234\t5678"} | |
b := &Interval{chrom: "chr1", start: 9234, end: 9678, | |
line: "chr1\t9234\t9678"} | |
c := &Interval{chrom: "chr2", start: 9234, end: 9678, | |
line: "chr1\t9234\t9678"} | |
q := make(Q, 0) | |
heap.Init(&q) | |
heap.Push(&q, b) | |
heap.Push(&q, a) | |
heap.Push(&q, c) | |
first := heap.Pop(&q) | |
if first != a { | |
t.Error("first interval off q should be a") | |
} | |
second := heap.Pop(&q) | |
if second != b { | |
t.Error("2nd interval off q should be b") | |
} | |
third := heap.Pop(&q) | |
if third != c { | |
t.Error("third interval off q should be c") | |
} | |
if heap.Pop(&q) != nil { | |
t.Error("empty q should return nil") | |
} | |
} | |
func TestMerge(t *testing.T) { | |
a := &Interval{chrom: "chr1", start: 1234, end: 5678, | |
line: "chr1\t1234\t5678"} | |
b := &Interval{chrom: "chr1", start: 9234, end: 9678, | |
line: "chr1\t9234\t9678"} | |
c := &Interval{chrom: "chr2", start: 9234, end: 9678, | |
line: "chr1\t9234\t9678"} | |
ai := 0 | |
bi := 0 | |
ci := 0 | |
nexta := func() *Interval { | |
if ai == 0 { | |
ai = 1 | |
return a | |
} | |
return nil | |
} | |
nextb := func() *Interval { | |
if bi == 0 { | |
bi = 1 | |
return b | |
} | |
return nil | |
} | |
nextc := func() *Interval { | |
if ci < 2 { | |
ci = ci + 1 | |
return c | |
} | |
return nil | |
} | |
merged := Merge(nexta, nextb, nextc) | |
first := merged() | |
if first != a { | |
t.Error("first interval off merge should be a") | |
} | |
second := merged() | |
if second != b { | |
t.Error("2nd interval off merge should be b") | |
} | |
third := merged() | |
if third != c { | |
t.Error("third interval off merge should be c") | |
} | |
fourth := merged() | |
if fourth != c { | |
t.Error("fourth interval off merge should be c") | |
} | |
if merged() != nil { | |
t.Error("empty Merge should return nil") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment