Created
July 6, 2013 00:15
-
-
Save jwintersinger/5937984 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from HTSeq import GenomicInterval | |
from collections import defaultdict | |
class IntervalCmp(object): | |
''' | |
Implements GenomicInterval comparison functions that ignore strandedness, | |
unlike the methods included in HTSeq. | |
''' | |
@staticmethod | |
def overlaps(a, b): | |
return ( | |
a.chrom == b.chrom and | |
min(a.end, b.end) > max(a.start, b.start) | |
) | |
@staticmethod | |
def contains(a, b): | |
return ( | |
a.chrom == b.chrom and | |
a.start <= b.start and | |
a.end >= b.end | |
) | |
@staticmethod | |
def precedes(a, b): | |
return ( | |
IntervalCmp.overlaps(a, b) and | |
a.start < b.start and | |
a.end < b.end | |
) | |
@staticmethod | |
def follows(a, b): | |
return IntervalCmp.precedes(b, a) | |
class GenomicIntervalSet(object): | |
''' | |
Enables operations on sets of GenomicIntervals. Currently supports only | |
subtraction, allowing you to, for example, subtract GenomicIntervals | |
representing annotated genes from a set of GenomicIntervals representing | |
BLAST hits, so you can retrieve the sequences of only intervals corresponding | |
to hits that are not within genes. | |
''' | |
def __init__(self): | |
self._intervals = defaultdict(list) | |
def add(self, interval): | |
self._intervals[interval.chrom].append(interval) | |
def __eq__(self, other): | |
return self._intervals == other._intervals | |
def __repr__(self): | |
return self.__class__.__name__ + repr(self._intervals.items()) | |
def __getitem__(self, chrom): | |
return self._intervals[chrom] | |
def __sub__(self, other): | |
results = GenomicIntervalSet() | |
for chrom in self._intervals.keys(): | |
minuhend = self._intervals[chrom] | |
subtrahend = other._intervals[chrom] | |
create_gi = lambda start, end: GenomicInterval(chrom, start, end, '.') | |
chr_result = minuhend | |
for j in subtrahend: | |
temp = [] | |
for i in chr_result: | |
if IntervalCmp.overlaps(i, j): | |
if IntervalCmp.contains(i, j): | |
temp.append( create_gi(i.start, j.start) ) | |
temp.append( create_gi(j.end, i.end) ) | |
elif IntervalCmp.contains(j, i): | |
# Do nothing, as all of i is consumed by j. | |
pass | |
elif IntervalCmp.precedes(j, i): | |
temp.append( create_gi(j.end, i.end) ) | |
elif IntervalCmp.follows(j, i): | |
temp.append( create_gi(i.start, j.start) ) | |
else: | |
temp.append(i) | |
chr_result = temp | |
for interval in chr_result: | |
# Copy intervals to eliminate strand (which would otherwise be | |
# preserved when no intervals are in subtrahend), and to ensure that | |
# new intervals are returned to user, rather than (some) references to | |
# ones that he provided as input. | |
interval = create_gi(interval.start, interval.end) | |
results.add(interval) | |
return results | |
import unittest | |
class TestGenomicIntervalSet(unittest.TestCase): | |
def setUp(self): | |
self.set1 = GenomicIntervalSet() | |
self.set1.add( GenomicInterval('chrA', 1, 1000, '+') ) | |
self.set1.add( GenomicInterval('chrA', 1100, 1200, '-') ) | |
self.set1.add( GenomicInterval('chrB', 1100, 1300, '+') ) | |
self.set2 = GenomicIntervalSet() | |
self.set2.add( GenomicInterval('chrA', 30, 50, '+') ) | |
self.set2.add( GenomicInterval('chrA', 60, 200, '+') ) | |
self.set2.add( GenomicInterval('chrA', 1050, 1150, '+') ) | |
self.set2.add( GenomicInterval('chrB', 1000, 1400, '-') ) | |
def test_subtract(self): | |
expected = [ | |
GenomicInterval('chrA', 1, 30, '.'), | |
GenomicInterval('chrA', 50, 60, '.'), | |
GenomicInterval('chrA', 200, 1000, '.'), | |
GenomicInterval('chrA', 1150, 1200, '.'), | |
] | |
expected_set = GenomicIntervalSet() | |
for interval in expected: | |
expected_set.add(interval) | |
self.assertEqual(self.set1 - self.set2, expected_set) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment