Skip to content

Instantly share code, notes, and snippets.

@jwintersinger
Created July 6, 2013 00:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jwintersinger/5937984 to your computer and use it in GitHub Desktop.
Save jwintersinger/5937984 to your computer and use it in GitHub Desktop.
from HTSeq import GenomicInterval
from collections import defaultdict
class IntervalCmp(object):
'''
Implements GenomicInterval comparison functions that ignore strandedness,
unlike the methods included in HTSeq.
'''
@staticmethod
def overlaps(a, b):
return (
a.chrom == b.chrom and
min(a.end, b.end) > max(a.start, b.start)
)
@staticmethod
def contains(a, b):
return (
a.chrom == b.chrom and
a.start <= b.start and
a.end >= b.end
)
@staticmethod
def precedes(a, b):
return (
IntervalCmp.overlaps(a, b) and
a.start < b.start and
a.end < b.end
)
@staticmethod
def follows(a, b):
return IntervalCmp.precedes(b, a)
class GenomicIntervalSet(object):
'''
Enables operations on sets of GenomicIntervals. Currently supports only
subtraction, allowing you to, for example, subtract GenomicIntervals
representing annotated genes from a set of GenomicIntervals representing
BLAST hits, so you can retrieve the sequences of only intervals corresponding
to hits that are not within genes.
'''
def __init__(self):
self._intervals = defaultdict(list)
def add(self, interval):
self._intervals[interval.chrom].append(interval)
def __eq__(self, other):
return self._intervals == other._intervals
def __repr__(self):
return self.__class__.__name__ + repr(self._intervals.items())
def __getitem__(self, chrom):
return self._intervals[chrom]
def __sub__(self, other):
results = GenomicIntervalSet()
for chrom in self._intervals.keys():
minuhend = self._intervals[chrom]
subtrahend = other._intervals[chrom]
create_gi = lambda start, end: GenomicInterval(chrom, start, end, '.')
chr_result = minuhend
for j in subtrahend:
temp = []
for i in chr_result:
if IntervalCmp.overlaps(i, j):
if IntervalCmp.contains(i, j):
temp.append( create_gi(i.start, j.start) )
temp.append( create_gi(j.end, i.end) )
elif IntervalCmp.contains(j, i):
# Do nothing, as all of i is consumed by j.
pass
elif IntervalCmp.precedes(j, i):
temp.append( create_gi(j.end, i.end) )
elif IntervalCmp.follows(j, i):
temp.append( create_gi(i.start, j.start) )
else:
temp.append(i)
chr_result = temp
for interval in chr_result:
# Copy intervals to eliminate strand (which would otherwise be
# preserved when no intervals are in subtrahend), and to ensure that
# new intervals are returned to user, rather than (some) references to
# ones that he provided as input.
interval = create_gi(interval.start, interval.end)
results.add(interval)
return results
import unittest
class TestGenomicIntervalSet(unittest.TestCase):
def setUp(self):
self.set1 = GenomicIntervalSet()
self.set1.add( GenomicInterval('chrA', 1, 1000, '+') )
self.set1.add( GenomicInterval('chrA', 1100, 1200, '-') )
self.set1.add( GenomicInterval('chrB', 1100, 1300, '+') )
self.set2 = GenomicIntervalSet()
self.set2.add( GenomicInterval('chrA', 30, 50, '+') )
self.set2.add( GenomicInterval('chrA', 60, 200, '+') )
self.set2.add( GenomicInterval('chrA', 1050, 1150, '+') )
self.set2.add( GenomicInterval('chrB', 1000, 1400, '-') )
def test_subtract(self):
expected = [
GenomicInterval('chrA', 1, 30, '.'),
GenomicInterval('chrA', 50, 60, '.'),
GenomicInterval('chrA', 200, 1000, '.'),
GenomicInterval('chrA', 1150, 1200, '.'),
]
expected_set = GenomicIntervalSet()
for interval in expected:
expected_set.add(interval)
self.assertEqual(self.set1 - self.set2, expected_set)
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment