Skip to content

Instantly share code, notes, and snippets.

@dpryan79
Created May 13, 2019 20:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dpryan79/84d806d2fbe8c0be55978c3aad1c23d9 to your computer and use it in GitHub Desktop.
Save dpryan79/84d806d2fbe8c0be55978c3aad1c23d9 to your computer and use it in GitHub Desktop.
Generate a blacklist file of polyX stretches of a given minimum length not within a specified distance of a TES
#!/usr/bin/env python
# This can take up to ~20 minutes and use up to ~2GB RAM for mammals
import argparse
import py2bit
from deeptoolsintervals import GTF, tree
from deeptoolsintervals.parse import openPossiblyCompressed, parseExonBounds, findRandomLabel
parser = argparse.ArgumentParser(description="Generate a blacklist file of polyX stretches of a given minimum length not within a specified distance of a TES")
parser.add_argument("--output", "-o", help="Output file", required=True)
parser.add_argument("--tb", help="2bit file", required=True)
parser.add_argument("--bed", help="BED file containing transcripts", required=True)
parser.add_argument("--minLength", help="Minimum length of a polyX stretch (default: 6)", type=int, default=6)
parser.add_argument("--base", help="The base to check for (A or T)", required=True, choices=['A', 'T'])
parser.add_argument("--minDistance", help="Minimum distance from a TES to not exclude a site (default: 100)", type=int, default=100)
parser.add_argument("--extend", help="Number of bases to extend regions (default: 5)", type=int, default=5)
args = parser.parse_args()
tb = py2bit.open(args.tb)
o = open(args.output, "w")
class TES(GTF):
def parseBEDcore(self, line, ncols):
strand = 3
cols = line.split("\t")
name = "{0}:{1}-{2}".format(cols[0], cols[1], cols[2])
if int(cols[1]) < 0:
cols[1] = 0
if int(cols[1]) >= int(cols[2]):
sys.stderr.write("Warning: {0}:{1}-{2} is an invalid BED interval! Ignoring it.\n".format(cols[0], cols[1], cols[2]))
return
# BED6/BED12: set name and strand
score = '.'
if ncols > 3:
name = cols[3]
if cols[5] == '+':
strand = 0
elif cols[5] == '-':
strand = 1
score = cols[4]
# filter by strand
if strand != 3:
if self.strand == "+" and strand == 1:
return
elif self.strand == "-" and strand == 0:
return
# Ensure that the name is unique
name = findRandomLabel(self.exons[self.labelIdx], name)
assert(len(cols) == 12)
exons = parseExonBounds(int(cols[1]), int(cols[2]), int(cols[9]), cols[10], cols[11])
# Extend by strand around the TES
exonsFinal = []
lenLeft = 2 * self.minDistance + 1
if strand == 3 or (self.strand == "+" and strand == 0):
exons[-1] = (exons[-1][0], exons[-1][1] + self.minDistance)
for exon in exons[::-1]:
exonLen = exon[1] - exon[0]
if exonLen <= lenLeft:
lenLeft -= exonLen
exonsFinal.insert(0, exon)
else:
exonsFinal.insert(0, (exon[1] - lenLeft, exon[1]))
break
if lenLeft <= 0:
break
elif self.strand == "-" and strand == 1:
_ = exons[0][0] -self.minDistance
_ = max(0, _)
exons[0] = (_, exons[0][1])
for exon in exons:
exonLen = exon[1] - exon[0]
if exonLen <= lenLeft:
lenLeft -= exonLen
exonsFinal.append(exon)
else:
exonsFinal.append((exon[0], exon[0] + lenLeft))
break
if lenLeft <= 0:
break
if len(exonsFinal) == 0:
return
self.tree.addEntry(self.mungeChromosome(cols[0]), exonsFinal[0][0], exonsFinal[-1][1], name, strand, self.labelIdx, score)
self.exons[self.labelIdx][name] = exonsFinal
def __init__(self, fname, minDistance=100, strand="+"):
self.fname = [fname]
self.filename = fname
self.filename = ""
self.chroms = []
self.exons = []
self.labels = []
self.transcriptIDduplicated = []
self.tree = tree.initTree()
self.labelIdx = 0
self.keepExons = True
self.defaultGroup = None
self.verbose = False
self.minDistance = minDistance
self.strand = strand
fp = openPossiblyCompressed(fname)
line, labelColumn = self.firstNonComment(fp)
assert(line) # This will only fail on empty files
line = line.strip()
ftype = self.inferType(fp, line, labelColumn)
self.parseBED(fp, line, 12, labelColumn)
fp.close()
# Sanity check
if self.tree.countEntries() == 0:
raise RuntimeError("None of the input BED/GTF files had valid regions")
# vine -> tree
self.tree.finish()
def processLast(last, chrom, idx, idx2, o, bed):
if args.base == "A":
s = max(0, idx - args.extend)
e = idx2
else:
s = idx
e = idx2 + args.extend
for overlaps in bed.findOverlaps(chrom, s, e):
for exon in overlaps[4]:
if exon[0] < s and exon[1] > s:
return
if exon[0] >= s and exon[0] < e:
return
if not last[0]:
last[0] = chrom
last[1] = s
last[2] = e
else:
if last[0] == chrom and s <= last[2]:
last[2] = e
else:
o.write("{}\t{}\t{}\n".format(*last))
last[0] = chrom
last[1] = s
last[2] = e
if args.base == "A":
bed = TES(args.bed, minDistance=args.minDistance)
else:
bed = TES(args.bed, minDistance=args.minDistance, strand="-")
last = [None, None, None]
for chrom, chromLength in tb.chroms().items():
s = tb.sequence(chrom)
idx = 0
idx2 = 0
while idx < chromLength - 1:
if s[idx] == args.base:
idx2 = idx + 1
while s[idx2] == args.base:
if idx2 + 1 >= chromLength:
break
idx2 += 1
if idx2 - idx >= args.minLength - 1:
processLast(last, chrom, idx, idx2 + 1, o, bed)
idx = idx2 + 1
else:
idx += 1
if last[0] is not None:
o.write("{}\t{}\t{}\n".format(*last))
o.close()
tb.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment