Created
July 2, 2020 20:38
-
-
Save saketkc/d70fbb9f3b611a7c51a48591b35d4d90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[1]: | |
from collections import defaultdict, OrderedDict | |
import warnings | |
import gffutils | |
import pybedtools | |
import pandas as pd | |
import copy | |
import os | |
import re | |
from gffutils.pybedtools_integration import tsses | |
from copy import deepcopy | |
from collections import OrderedDict, Callable | |
import errno | |
def mkdir_p(path): | |
try: | |
os.makedirs(path) | |
except OSError as exc: # Python >2.5 | |
if exc.errno == errno.EEXIST and os.path.isdir(path): | |
pass | |
else: | |
raise | |
class DefaultOrderedDict(OrderedDict): | |
# Source: http://stackoverflow.com/a/6190500/562769 | |
def __init__(self, default_factory=None, *a, **kw): | |
if (default_factory is not None and | |
not isinstance(default_factory, Callable)): | |
raise TypeError('first argument must be callable') | |
OrderedDict.__init__(self, *a, **kw) | |
self.default_factory = default_factory | |
def __getitem__(self, key): | |
try: | |
return OrderedDict.__getitem__(self, key) | |
except KeyError: | |
return self.__missing__(key) | |
def __missing__(self, key): | |
if self.default_factory is None: | |
raise KeyError(key) | |
self[key] = value = self.default_factory() | |
return value | |
def __reduce__(self): | |
if self.default_factory is None: | |
args = tuple() | |
else: | |
args = self.default_factory, | |
return type(self), args, None, None, self.items() | |
def copy(self): | |
return self.__copy__() | |
def __copy__(self): | |
return type(self)(self.default_factory, self) | |
def __deepcopy__(self, memo): | |
import copy | |
return type(self)(self.default_factory, | |
copy.deepcopy(self.items())) | |
def __repr__(self): | |
return 'OrderedDefaultDict(%s, %s)' % (self.default_factory, | |
OrderedDict.__repr__(self)) | |
# In[2]: | |
# Just need to change these | |
gtf = './Homo_sapiens.GRCh38.96.gtf' | |
gtf_db = '{}.db'.format(gtf) | |
prefix = './output_beds/' | |
mkdir_p(prefix) | |
#chrsizes = '/panfs/qcb-panasas/skchoudh/genomes/hg38/fasta/hg38.chrom.sizes' | |
# In[5]: | |
def create_gene_dict(db): | |
''' | |
Store each feature line db.all_features() as a dict of dicts | |
''' | |
gene_dict = DefaultOrderedDict(lambda: DefaultOrderedDict(lambda: DefaultOrderedDict(list))) | |
for line_no, feature in enumerate(db.all_features()): | |
gene_ids = feature.attributes['gene_id'] | |
feature_type = feature.featuretype | |
if feature_type == 'gene': | |
if len(gene_ids)!=1: | |
logging.warning('Found multiple gene_ids on line {} in gtf'.format(line_no)) | |
break | |
else: | |
gene_id = gene_ids[0] | |
gene_dict[gene_id]['gene'] = feature | |
else: | |
transcript_ids = feature.attributes['transcript_id'] | |
for gene_id in gene_ids: | |
for transcript_id in transcript_ids: | |
gene_dict[gene_id][transcript_id][feature_type].append(feature) | |
return gene_dict | |
# In[ ]: | |
db = gffutils.create_db(gtf, dbfn=gtf_db, | |
merge_strategy='merge', | |
force=True, | |
disable_infer_transcripts=True, | |
disable_infer_genes=True) | |
# In[6]: | |
db = gffutils.FeatureDB(gtf_db, keep_order=True) | |
gene_dict = create_gene_dict(db) | |
# In[7]: | |
def get_gene_list(gene_dict): | |
return list(set(gene_dict.keys())) | |
def get_UTR_regions(utrs, cds): | |
if len(cds)==0: | |
return [], [] | |
utr5_regions = [] | |
utr3_regions = [] | |
cds_sorted = sorted(list(cds), key=lambda x: x.start) | |
first_cds = cds_sorted[0] | |
last_cds = cds_sorted[-1] | |
for orig_utr in utrs: | |
utr = deepcopy(orig_utr) | |
## Push all cds at once | |
## Sort later to remove duplicates | |
strand = utr.strand | |
if utr.start < first_cds.start: | |
if utr.stop >= first_cds.start: | |
utr.stop = first_cds.start - 1 | |
if strand == '+': | |
utr5_regions.append(utr) | |
else: | |
utr3_regions.append(utr) | |
elif utr.stop > last_cds.stop: | |
if utr.start <= last_cds.stop: | |
utr.start = last_cds.stop + 1 | |
if strand == '+': | |
utr3_regions.append(utr) | |
else: | |
utr5_regions.append(utr) | |
return utr5_regions, utr3_regions | |
def create_bed(regions, bedtype='0'): | |
'''Create bed from list of regions | |
bedtype: 0 or 1 | |
0-Based or 1-based coordinate of the BED | |
''' | |
bedstr = '' | |
for region in regions: | |
assert len(region.attributes['gene_id']) == 1 | |
## GTF start is 1-based, so shift by one while writing | |
## to 0-based BED format | |
if bedtype == '0': | |
start = region.start - 1 | |
else: | |
start = region.start | |
bedstr += '{}\t{}\t{}\t{}\t{}\t{}\n'.format(region.chrom, | |
start, | |
region.stop, | |
re.sub('\.\d+', '', region.attributes['gene_id'][0]), | |
'.', | |
region.strand) | |
return bedstr | |
def rename_regions(regions, gene_id): | |
regions = list(regions) | |
if len(regions) == 0: | |
return [] | |
for region in regions: | |
region.attributes['gene_id'] = gene_id | |
return regions | |
def merge_regions(db, regions): | |
if len(regions) == 0: | |
return [] | |
merged = db.merge(sorted(list(regions), key=lambda x: x.start)) | |
return merged | |
def merge_regions_nostrand(db, regions): | |
if len(regions) == 0: | |
return [] | |
merged = db.merge(sorted(list(regions), key=lambda x: x.start), ignore_strand=True) | |
return merged | |
# In[ ]: | |
utr5_bed = '' | |
utr3_bed = '' | |
gene_bed = '' | |
exon_bed = '' | |
intron_bed = '' | |
start_codon_bed = '' | |
stop_codon_bed = '' | |
cds_bed = '' | |
gene_list = [] | |
for gene_id in get_gene_list(gene_dict): | |
gene_list.append(gene_dict[gene_id]['gene']) | |
utr5_regions, utr3_regions = [], [] | |
exon_regions, intron_regions = [], [] | |
star_codon_regions, stop_codon_regions = [], [] | |
cds_regions = [] | |
utr_regions = [] | |
for feature in gene_dict[gene_id].keys(): | |
if feature == 'gene': | |
continue | |
cds = list(gene_dict[gene_id][feature]['CDS']) | |
exons = list(gene_dict[gene_id][feature]['exon']) | |
utrs = list(gene_dict[gene_id][feature]['UTR']) | |
cds = sorted(list(cds), key=lambda x: x.start) | |
exons = sorted(list(exons), key=lambda x: x.start) | |
utrs = sorted(list(utrs), key=lambda x: x.start) | |
merged_exons = merge_regions(db, exons) | |
introns = db.interfeatures(merged_exons) | |
exon_regions += exons | |
intron_regions += introns | |
cds_regions += cds | |
utr_regions += utrs | |
cds_regions = sorted(list(cds_regions), key=lambda x: x.start) | |
utr_regions = sorted(list(utr_regions), key=lambda x: x.start) | |
utr5_regions, utr3_regions = get_UTR_regions(utr_regions, cds_regions) | |
merged_utr5 = merge_regions(db, utr5_regions) | |
renamed_utr5 = rename_regions(merged_utr5, gene_id) | |
merged_utr3 = merge_regions(db, utr3_regions) | |
renamed_utr3 = rename_regions(merged_utr3, gene_id) | |
merged_exons = merge_regions(db, exon_regions) | |
renamed_exons = rename_regions(merged_exons, gene_id) | |
merged_introns = merge_regions(db, intron_regions) | |
renamed_introns = rename_regions(merged_introns, gene_id) | |
merged_cds = merge_regions(db, cds_regions) | |
renamed_cds = rename_regions(merged_cds, gene_id) | |
utr3_bed += create_bed(renamed_utr3) | |
utr5_bed += create_bed(renamed_utr5) | |
exon_bed += create_bed(renamed_exons) | |
intron_bed += create_bed(renamed_introns) | |
cds_bed += create_bed(renamed_cds) | |
gene_bed = create_bed(gene_list) | |
gene_bedtool = pybedtools.BedTool(gene_bed, from_string=True) | |
utr5_bedtool = pybedtools.BedTool(utr5_bed, from_string=True) | |
utr3_bedtool = pybedtools.BedTool(utr3_bed, from_string=True) | |
exon_bedtool = pybedtools.BedTool(exon_bed, from_string=True) | |
intron_bedtool = pybedtools.BedTool(intron_bed, from_string=True) | |
cds_bedtool = pybedtools.BedTool(cds_bed, from_string=True) | |
utr5_cds_subtracted = utr5_bedtool.subtract(cds_bedtool) | |
utr3_cds_subtracted = utr3_bedtool.subtract(cds_bedtool) | |
# In[ ]: | |
utr5_cds_subtracted.remove_invalid().sort().saveas(os.path.join(prefix, 'utr5.bed.gz')) | |
utr3_cds_subtracted.remove_invalid().sort().saveas(os.path.join(prefix, 'utr3.bed.gz')) | |
gene_bedtool.remove_invalid().sort().saveas(os.path.join(prefix, 'gene.bed.gz')) | |
exon_bedtool.remove_invalid().sort().saveas(os.path.join(prefix, 'exon.bed.gz')) | |
intron_bedtool.remove_invalid().sort().saveas(os.path.join(prefix, 'intron.bed.gz')) | |
cds_bedtool.remove_invalid().sort().saveas(os.path.join(prefix, 'cds.bed.gz')) | |
# In[ ]: | |
for gene_id in get_gene_list(gene_dict): | |
start_codons = [] | |
stop_codons = [] | |
for start_codon in db.children(gene_id, featuretype='start_codon'): | |
## 1 -based stop | |
## 0-based start handled while converting to bed | |
start_codon.stop = start_codon.start | |
start_codons.append(start_codon) | |
for stop_codon in db.children(gene_id, featuretype='stop_codon'): | |
stop_codon.start = stop_codon.stop | |
stop_codon.stop = stop_codon.stop+1 | |
stop_codons.append(stop_codon) | |
merged_start_codons = merge_regions(db, start_codons) | |
renamed_start_codons = rename_regions(merged_start_codons, gene_id) | |
merged_stop_codons = merge_regions(db, stop_codons) | |
renamed_stop_codons = rename_regions(merged_stop_codons, gene_id) | |
start_codon_bed += create_bed(renamed_start_codons) | |
stop_codon_bed += create_bed(renamed_stop_codons) | |
start_codon_bedtool = pybedtools.BedTool(start_codon_bed, from_string=True) | |
stop_codon_bedtool = pybedtools.BedTool(stop_codon_bed, from_string=True) | |
start_codon_bedtool.remove_invalid().sort().saveas(os.path.join(prefix, 'start_codon.bed.gz')) | |
stop_codon_bedtool.remove_invalid().sort().saveas(os.path.join(prefix, 'stop_codon.bed.gz')) | |
# In[ ]: | |
stop_codon_bedtool | |
# In[9]: | |
tss = tsses(db, as_bed6=True, merge_overlapping=True) | |
tss.remove_invalid().sort().saveas(os.path.join(prefix, 'tss.bed')) | |
# In[10]: | |
#promoter = tss.slop(l=1500, r=1500, s=True, g=chrsizes) | |
#promoter.remove_invalid().sort().saveas(os.path.join(prefix, 'promoter.1500.bed.gz')) | |
# In[13]: | |
#promoter = tss.slop(l=1000, r=1000, s=True, g=chrsizes) | |
#promoter.remove_invalid().sort().saveas(os.path.join(prefix, 'promoter.1000.bed.gz')) | |
# In[12]: | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment