Skip to content

Instantly share code, notes, and snippets.

@kantale
Last active April 3, 2020 17:33
Show Gist options
  • Save kantale/0d11071991a6cfaa23775f564e1a27d7 to your computer and use it in GitHub Desktop.
Save kantale/0d11071991a6cfaa23775f564e1a27d7 to your computer and use it in GitHub Desktop.
Υλοποίηση του project στο μάθημα "Εισαγωγή στον προγραμματισμό με τη γλώσσα python" 2018-2019
import re
import copy
import gzip
import json
import bisect
import argparse
import requests
from collections import defaultdict, OrderedDict
from pyliftover import LiftOver
'''
Create environment
/Users/alexandroskanterakis/anaconda3/bin/conda create -n python_project python=3.7
Start environment
source /Users/alexandroskanterakis/anaconda3/bin/activate python_project
pip install requests
pip install pyliftover
Deactive
source deactivate
Remove environment
/Users/alexandroskanterakis/anaconda3/bin/conda remove --name python_project --all
'''
consts = {
'gencode': 'gencode.v29.annotation.gff3.gz',
'transcripts': {}, # Transcript data
'lo': LiftOver('hg19', 'hg38'),
'lo_inv': LiftOver('hg38', 'hg19'),
'errors': defaultdict(int),
'messages': {
'pseudoautosomal_lines' : 'pseudoautosomal genes were ignored. Number of lines found:',
'pseudoautosomal_transcripts': 'pseudoautosomal genes were ignored. Number of transcripts found:',
},
'starts': defaultdict(list),
'ends': defaultdict(list),
'transcripts_sorted': defaultdict(list),
}
def lift(chromosome, position, inverse=False):
if type(chromosome) is str:
if not 'chr' in chromosome:
chromosome = 'chr' + chromosome
elif type(chromosome) is int:
chromosome = 'chr' + str(chromosome)
else:
raise Exception('Unknown type of parameter chromosome: {}'.format(type(chromosome).__name__))
if inverse:
s = consts['lo_inv'].convert_coordinate(chromosome, position)
else:
s = consts['lo'].convert_coordinate(chromosome, position)
if len(s) > 1:
print ('WARNING: liftover returned more than one result. Taking the first..')
if len(s) == 0:
raise Exception('Liftover failed..')
return s[0][1]
def parse_gff3_line(l):
ls = l.split()
#Read chromsome
chromosome = ls[0]
# Make sure that chromosome is 'chr\d+'
assert re.match(r'chr[\dXYMT]+', chromosome)
#Annotation source
source= ls[1]
#Make sure that this is either 'HAVANA' or 'ENSEMBL'
assert source in ['HAVANA', 'ENSEMBL']
#Type of the regulatory element (gene, transcript, exon, ...)
t_type = ls[2]
if t_type in ['stop_codon_redefined_as_selenocysteine']:
return None
#Read start and end
start = int(ls[3])
end = int(ls[4])
#Read annotation
#For example: ID=exon:ENST00000450305.2:1;Parent=ENST00000450305.2;gene_id=ENSG00000223972.5 ..
annotation = {key:value for key,value in [x.split('=') for x in ls[8].split(';')]}
return {
'chromosome': chromosome,
'source': source,
'type': t_type,
'start': start,
'end': end,
'annotation': annotation
}
def decode_appris(appris):
'''
Return a integer representation of appris
The lower the better.
'''
return {
'appris_principal_1': 0,
'appris_principal_2': 1,
'appris_principal_3': 2,
'appris_principal_4': 3,
'appris_principal_5': 4,
'appris_alternative_1':5,
'appris_alternative_2':6,
}[appris]
def take_best_appris(transcripts):
'''
From a set of transcripts the one with the lowest apris code
'''
s = sorted(transcripts, key=lambda x: consts['transcripts'][x]['appris'])
return s[0] # FIX ME !!! MIGHT BE MORE THAN LOWEST!!!!
def decode_chromosome(chromosome):
'''
Return an int representation of a chromosome
'''
m = re.match(r'chr([\dXYMT]+)', chromosome)
if m:
return m.group(1)
raise Exception('Cannot decode chromosome: {}'.format(chromosome))
def store_transcript_data(gff3_data):
chromosome = decode_chromosome(gff3_data['chromosome'])
annotations = gff3_data['annotation']
transcript_id = annotations['transcript_id']
gene = annotations['gene_name']
t_type = gff3_data['type']
if '_PAR_' in annotations['ID']:
# These are pseudoautosomal genes
# They have the same transcript id and exist in both X and Y chromosomes..
consts['errors']['pseudoautosomal_lines'] += 1
if t_type == 'transcript':
consts['errors']['pseudoautosomal_transcripts'] += 1
return
start = gff3_data['start']
end = gff3_data['end']
exon_number = int(annotations.get('exon_number', -1))
#Check that exon number makes sense...
if t_type in ['CDS', 'exon', 'start_codon', 'stop_codon', 'three_prime_UTR', 'five_prime_UTR']:
assert exon_number>0
location_tuple = (start, end, exon_number)
#Read appris information
appris = [x for x in annotations['tag'].split(',') if 'appris' in x]
assert len(appris) == 1 # Cannot have more than one appries..
appris = appris[0]
#print (transcript_id)
transcripts = consts['transcripts']
if not transcript_id in transcripts:
transcripts[transcript_id] = {
'gene': gene,
'chr': chromosome,
'start': None,
'end': None,
'exons': [],
'cds': [],
'5utr': [],
'3utr': [],
'start_c': [], # Start codon
'stop_c': [], # Stop codon
'appris': None,
}
transcript_info = transcripts[transcript_id]
#Some quality control
assert transcript_info['gene'] == gene
#Store transcript information in relevant fields
if t_type == 'transcript':
assert transcript_info['appris'] is None
transcript_info['appris'] = decode_appris(appris)
assert transcript_info['start'] is None
assert transcript_info['end'] is None
transcript_info['start'] = start
transcript_info['end'] = end
elif t_type == 'exon':
transcript_info['exons'].append(location_tuple)
elif t_type == 'CDS':
transcript_info['cds'].append(location_tuple)
elif t_type == 'start_codon':
#assert transcript_info['start_c'] is None # Cannot have two start codons.. Yet.. some.. have
transcript_info['start_c'].append(location_tuple)
elif t_type == 'stop_codon':
#assert transcript_info['stop_c'] is None # Cannot have two stop codons..
transcript_info['stop_c'].append(location_tuple)
elif t_type == 'five_prime_UTR':
transcript_info['5utr'].append(location_tuple)
elif t_type == 'three_prime_UTR':
transcript_info['3utr'].append(location_tuple)
else:
raise Exception('Unknown regulatory element: {}'.format(t_type))
def parse_gff3(fn):
'''
Parse gff3 file
'''
print ('Start parsing:', fn)
with gzip.open(fn, 'rt') as f:
line_counter = 0
for l in f:
line_counter += 1
if line_counter % 100000 == 0:
print ('Lines: {0} {1:.2f}%'.format(line_counter, 100*line_counter/2739404))
if l[0] == '#': # This is a comment
continue
if not 'appris' in l: # Ignore transcripts without appris information
continue
gff3_data = parse_gff3_line(l)
if not gff3_data:
continue
store_transcript_data(gff3_data)
#if line_counter > 10000:
# break
#if gff3_data['chromosome'] != 'chr1':
# break
print ('Done parsing:', fn)
def int_commas(i):
'''
1234 --> 1,234
12345 --> 12,345
123456 --> 123,456
1234567 --> 1,234,567
...
'''
s = str(i)
return ','.join([s[::-1][x:x+3][::-1] for x in range(0,len(s),3)][::-1])
# return ','.join([s[0 if x-3<0 else x-3:x] for x in range(len(s),0,-3)][::-1]) # Another way
def format_pos(p):
'''
padding.
Making sure that the string reporesentation of
position takes always exactly 11 characters
The padding is equal-ish to the left and right
'''
s = int_commas(p)
padding = 11-len(s)
padding_left = padding//2
padding_right = padding - padding_left
ret = (' ' * padding_left) + s + (' ' * padding_right)
assert len(ret) == 11
return ret
def format_exon(p):
'''
exon number
'''
if p < 10:
return str(p) + ' '
return str(p)
def exon_box_tag(name):
d = {
'5utr': '5\'UTR',
'3utr': '3\'UTR',
'utr5': '5\'UTR',
'utr3': '3\'UTR',
'cds' : ' CDS'
}
return d[name]
def exon_str_1(number, first):
'''
print (exon_str_1(50))
'''
return '''
╔══════════════╗
║ Exon {} ║
╟──────────────╢
║ {} ║
╟──────────────╢
║ ║
║ ║
'''.format(format_exon(number), exon_box_tag(first))
def exon_str_2(number, first, second):
'''
print (exon_str_2(4, '3utr', 'cds'))
print (exon_str_2(4, 'cds', '5utr'))
print (exon_str_2(4, 'cds', 'cds'))
print (exon_str_2(4, '3utr', '3utr'))
'''
return '''
╔═════════════════════════════╗
║ Exon {} ║
╟──────────────┬──────────────╢
║ {} │ {} ║
╟──────────────┼──────────────╢
║ │ ║
║ │ ║
'''.format(format_exon(number), exon_box_tag(first), exon_box_tag(second))
def exon_str_3(number, first, second, third):
return '''
╔════════════════════════════════════════════╗
║ Exon {} ║
╟──────────────┬──────────────┬──────────────╢
║ {} │ {} │ {} ║
╟──────────────┼──────────────┼──────────────╢
║ │ │ ║
║ │ │ ║
'''.format(format_exon(number), exon_box_tag(first), exon_box_tag(second), exon_box_tag(third))
def exon_join():
'''
Oh christmas tree...
'''
return '''
╱╲
╱ ╲
╱ ╲
╱ ╲
╱ ╲
╱ ╲
╱ ╲
'''
def space_check(space_1, space_2):
'''
Returns True if space1 is BEFORE space_2
Returns False is space1 is AFTER space_2
Throws excpetion in case of overlap
'''
if space_1[0] <= space_1[1] <= space_2[0] <= space_2[1]:
return True
if space_2[0] <= space_2[0] and space_1[0] <= space_1[1]:
return False
raise Exception('Space overlap: {} {}'.format(space_1, space_2))
def exon_map(exon, cds, utr_3, utr_5):
'''
Return a box for each exon
Return the positions of the edges of the boxes
'''
exon_number = exon[2]
if utr_3 is None and utr_5 is None:
assert exon == cds # If not utrs present exon and cds should coincide
return exon_str_1(exon_number, 'cds') , (exon[0], exon[1])
elif cds is None and utr_5 is None:
assert exon == utr_3
return exon_str_1(exon_number, 'utr3'), (exon[0], exon[1])
elif cds is None and utr_3 is None:
assert exon == utr_5
return exon_str_1(exon_number, 'utr5'), (exon[0], exon[1])
elif utr_3 is None:
assert utr_5 # Redundant but ok..
#Which is first, CDS or UTR5?
s_check = space_check(cds, utr_5)
if s_check:
assert exon[0] == cds[0]
assert cds[1] == utr_5[0] - 1
assert exon[1] == utr_5[1]
return exon_str_2(exon_number, 'cds', 'utr5'), (exon[0], cds[1], exon[1])
else:
assert exon[0] == utr_5[0]
assert utr_5[1] == cds[0] - 1
assert exon[1] == cds[1]
return exon_str_2(exon_number, 'utr5', 'cds'), (exon[0], utr_5[1], exon[1])
elif utr_5 is None:
assert utr_3 # Redundancy is bliss..
s_check = space_check(cds, utr_3)
if s_check:
assert exon[0] == cds[0]
assert cds[1] == utr_3[0] - 1
assert exon[1] == utr_3[1]
return exon_str_2(exon_number, 'cds', 'utr3'), (exon[0], cds[0], exon[1])
else:
assert exon[0] == utr_3[0]
assert utr_3[1] == cds[0] - 1
assert exon[1] == cds[1]
return exon_str_2(exon_number, 'utr3', 'cds'), (exon[0], utr_3[1], exon[1])
else:
assert cds
assert utr_3
assert utr_5
# 69055, 70108, 1) (69091, 70008, 1) (70009, 70108, 1) (69055, 69090, 1)
# Which is first, utr_3 or utr_5 ?
s_check = space_check(utr_3, utr_5)
if s_check:
assert utr_3[0] <= utr_3[1] <= cds[0] <= cds[1] <= utr_5[0] <= utr_5[1]
return exon_str_3(exon_number, 'utr3', 'cds', 'utr5'), (exon[0], cds[0], utr_5[0], exon[1])
else:
assert utr_5[0] <= utr_5[1] <= cds[0] <= cds[1] <= utr_3[0] <= utr_3[1]
return exon_str_3(exon_number, 'utr5', 'cds', 'utr3'), (exon[0], cds[0], utr_3[0], exon[1])
raise Exception('This should never happen..')
def isoform_map(transcript, position=None):
'''
position: position to Highligh!
'''
tr_data = consts['transcripts'][transcript]
def element_getter(element, exon_number):
ret = [x for x in tr_data[element] if x[2]==exon_number]
if len(ret) > 1:
raise Exception('Transcript: {} exon_number: {} has more than one: {}'.
format(transcript, exon_number, element))
if ret:
return ret[0]
return None # Do not have do.. but always nice and readable
#Order exons accorsing to position
exons_ordered = sorted(tr_data['exons'])
boxes = []
positions = []
for exon in exons_ordered:
exon_number = exon[2]
#Get all elements of the exon
cds = element_getter('cds', exon_number)
utr_3 = element_getter('3utr', exon_number)
utr_5 = element_getter('5utr', exon_number)
if boxes:
boxes.append(exon_join())
exon_box, exon_positions = exon_map(exon, cds, utr_3, utr_5)
boxes.append(exon_box)
positions.append(exon_positions)
boxes = [x.split('\n')[1:] for x in boxes] # Remove first line
assert len({len(x) for x in boxes}) <= 1 # Each "box" should have the same number of lines
boxes_str = '\n'.join([' '+''.join(x) for x in zip(*boxes)]) # "Zip" boxes together
# print (boxes_str)
boxes_str = boxes_str[:-5] # Remove enter at the end of boxes
all_positions = [y for x in positions for y in x] # Flatten positions
positions_str = '' + ' '.join([format_pos(x) for x in all_positions])
# print (positions_str)
complete_str = boxes_str + positions_str
if position:
position_index = bisect.bisect(all_positions, position)
#print ('bisect:', position_index)
if all_positions[position_index-1] == position:
position_line = ' ' *(15*(position_index-1)) + format_pos(position)
else:
position_line = ' ' *(15*position_index-7) + format_pos(position)
complete_str = complete_str + '\n' + position_line
return complete_str
#print (tr_data)
#isoform_map('ENST00000433179.3')
def test_isoform_map():
'''
For all transcripts check isoform maps
'''
print ('Generating all possible isoforms!')
d= open('del.isoforms', 'w')
for counter, transcript in enumerate(consts['transcripts']):
if counter % 100 == 0:
print ('Isoforms:', counter)
m = isoform_map(transcript)
d.write(m)
d.write('\n')
#print (m)
if counter > 10000:
break
print ('Testing done. Total: ', counter)
d.close()
def test_isoform_map_2():
print (isoform_map('ENST00000332831.4'))
print (isoform_map('ENST00000332831.4', 685700))
print (isoform_map('ENST00000332831.4', 686000))
print (isoform_map('ENST00000332831.4', 686660))
print (isoform_map('ENST00000332831.4', 685679))
print (isoform_map('ENST00000332831.4', 685716))
print (isoform_map('ENST00000332831.4', 686673))
def search_intervals(starts, ends, position):
'''
Assumes intervals of numbers.
zip(starts, ends)
which indexes of these contain the position?
'''
assert len(starts) == len(ends)
if position < starts[0]:
return () ## Too small position
if position > ends[-1]:
return ()
start_position = bisect.bisect_right(starts, position)
# We do know that all position right from start_position are not suitable
#print (start_position)
if start_position == 0:
return () ## Too small position
if start_position == len(starts):
start_position -= 1
ret = []
offset = 0
while True:
current_pos = start_position-offset
if current_pos < 0:
break
current_start = starts[current_pos]
current_end = ends[current_pos]
if current_start <= position <= current_end:
ret.append(current_pos)
else:
if offset>=1:
break
offset += 1
return ret
def test_search_intervals():
a = [20,50,80]
b = [40,60,100]
print (a)
print (b)
print (search_intervals(a,b,55), '1')
print (search_intervals(a,b,30), '0')
print (search_intervals(a,b,85), '2')
print (search_intervals(a,b,15), '<empty>')
print (search_intervals(a,b,110), '<empty>')
print (search_intervals(a,b,20), '0')
print (search_intervals(a,b,40), '0')
print (search_intervals(a,b,50), '1')
print (search_intervals(a,b,60), '1')
print (search_intervals(a,b,80), '2')
print (search_intervals(a,b,100), '2')
print (search_intervals(a,b,45), '<empty>', 45)
print (search_intervals(a,b,70), '<empty>', 70)
a = [20,50,80]
b = [60,60,100]
print (a)
print (b)
print (search_intervals(a,b,55), '1,0')
print (search_intervals(a,b,60), '1,0')
print (search_intervals(a,b,50), '1,0')
print (search_intervals(a,b,40), '0')
print (search_intervals(a,b,20), '0')
print (search_intervals(a,b,19), '<empty>')
print (search_intervals(a,b,85), '2')
print (search_intervals(a,b,80), '2')
print (search_intervals(a,b,100), '2')
print (search_intervals(a,b,105), '<empty>')
a = [20,50,80]
b = [100,100,100]
print (a)
print (b)
print (search_intervals(a,b,90), '2,1,0')
print (search_intervals(a,b,30), '0')
print (search_intervals(a,b,60), '0,1')
def search_empty_intervals(starts, ends, position):
'''
Get empty intervals
'''
start_position = bisect.bisect(starts, position)
if start_position == 0:
return (None, 0)
if start_position == len(starts):
return (start_position-1, None)
return (start_position-1, start_position)
def index_transcripts():
print ('Indexing transcripts..')
transcripts = consts['transcripts']
sorted_positions = sorted((v['chr'], v['start'], v['end'], k) for k, v in transcripts.items())
#consts['all_chromosomes'] = {v['chr'] for k,v in transripts.item()}
for chromosome, start, end, transcript in sorted_positions:
consts['starts'][chromosome].append(start)
consts['ends'][chromosome].append(end)
consts['transcripts_sorted'][chromosome].append(transcript)
print (' Indexing transcripts DONE.')
def index_genes():
print ('Indexing genes..')
transcripts = consts['transcripts']
# Create a dictionary. Key = gene, values = list of transcripts
genes = defaultdict(list)
_ = [genes[v['gene']].append(k) for k,v in transcripts.items()]
genes = {k:take_best_appris(v) for k,v in genes.items()}
consts['genes'] = genes
# Bug: Previous of first gene in chromosome 2 is the last of chromosome 1..
consts['genes_ordered'] = OrderedDict(sorted(((k, (transcripts[v]['chr'], transcripts[v]['start'], transcripts[v]['end'])) for k,v in genes.items()), key=lambda x: x[1]))
consts['genes_ordered_keys'] = list(consts['genes_ordered'].keys())
consts['genes_ordered_values'] = list(consts['genes_ordered'].values())
print (' Indexing genes DONE.')
def search_transcripts(chromosome, position):
'''
Get all the transcripts with that position
print(search_transcripts(686655)) # (True, ['ENST00000332831.4'])
print(search_transcripts(500000)) # (False, ['ENST00000426406.3', 'ENST00000332831.4'])
print(search_transcripts(50000)) # (False, [None, 'ENST00000335137.4'])
'''
found_intervals = search_intervals(consts['starts'][chromosome], consts['ends'][chromosome], position)
if found_intervals:
return True, [consts['transcripts_sorted'][chromosome][x] for x in found_intervals]
intervals = search_empty_intervals(consts['starts'][chromosome], consts['ends'][chromosome], position)
return False, [consts['transcripts_sorted'][chromosome][x] if x else None for x in intervals]
#======================QUESTIONS============================
def configure_regexp():
refseq = r'NM_[\d]+\.[\d]+|NM_[\d]+'
ensembl = r'ENST[\d]+\.[\d]+|ENST[\d]+'
chromosome = r'1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|x|y|X|Y'
transcript = r'(?P<refseq>{})|(?P<ensembl>{})|(?P<gene>[\w]+)|(?P<hgvs_chromosome>{})'.format(refseq, ensembl, chromosome)
chromosome_decl = r'chromosome|chr'
split_chr_position = r':|,|and[\s]+position|and'
rs = r'rs[\d]+'
mutation_change = r'[ACGT]>[ACGT]'
chr_mutation = r'({})?[\s]*(?P<chromosome>{})[\s]*({})[\s]*(?P<position>[\d]+)[\s]*(?P<change>{})?'.format(chromosome_decl, chromosome, split_chr_position, mutation_change) # chr 1 : 234234234
hgvs_mutation = r'({}):[cg]\.[\d]+({})'.format(transcript, mutation_change)
mutation=r'(?P<rs>{})|(?P<hgvs_mutation>{})|(?P<chr_mutation>{})'.format(rs, hgvs_mutation, chr_mutation)
#print (mutation)
mut_reg = re.compile(mutation)
consts['mut_regexp'] = mut_reg
if False: # To test
m = mut_reg.search('rs12345')
print(m.groupdict())
m = mut_reg.search('1:123')
print(m.groupdict())
m = mut_reg.search('chr1:123')
print(m.groupdict())
m = mut_reg.search('chromosome 1:123')
print(m.groupdict())
m = mut_reg.search('chromosome 1 and position 123')
print(m.groupdict())
m = mut_reg.search('chromosome 1 and 123')
print(m.groupdict())
m = mut_reg.search('NM_234234:c.100A>G')
print(m.groupdict())
m = mut_reg.search('ENST00022222:c.100A>G')
print(m.groupdict())
m = mut_reg.search('2:g.100A>G')
print(m.groupdict())
m = mut_reg.search('chr2:g.100A>G')
print(m.groupdict())
def get_location_from_regexp_match(match):
'''
python project.py --question " rs58991260 " --> Chromosome: 1 Position: 218458480 . bnelongs in between transcripts
python project.py --question " rs262656 "
python project.py --question " rs780472451 " --> Coding Sequence Variant
python project.py --question " 1:2157315 "
python project.py --question " AGT:c.803T>C "
python project.py --question " NM_003193.4:c.464T>A "
python project.py --question " NM_003002.3:c.274G>T "
python project.py --question " ENST00000003084:c.1431T>G "
'''
#print (match)
if match['rs']:
location, change = myvariant(match['rs'], task='fetch_location_of_dbsnp')
return location, change
if match['chr_mutation']:
location = {'chromosome': match['chromosome'], 'position': int(match['position'])}
change = match['change']
return location, chang
if match['hgvs_mutation']:
# There are three posibilities: GENE, ENSEMBL, REFSEQ
if match['gene']:
location, change = vep(match['hgvs_mutation'], task="fetch_location_from_HGVS") # AGT:c.803T>C
return location, change
elif match['refseq']:
location, change = mutalyzer(match['hgvs_mutation'], task='') # NM_003002.3:c.274G>T
return location, change
elif match['ensembl']:
location, change = vep(match['hgvs_mutation'], task="fetch_location_from_HGVS") # ENST00000003084:c.1431T>G
return location, change
else:
raise Exception('This should never happen')
else:
raise Exception('Could not extract location from match')
def answer(question):
'''
rs58991260 . in chromosome 1
'''
genes_ordered_values = consts['genes_ordered_values']
genes_ordered_keys = consts['genes_ordered_keys']
transcripts = consts['transcripts']
print ('The question asked is:')
print ('--> ' + question)
to_print = []
gene = None
transcript = None
primary_transcript = None
location = None
change = None
m = re.search(r'gene[\s]+([\w]+)', question)
if m:
gene = m.group(1)
gene = gene.upper()
if gene in consts['genes']:
transcript = consts['genes'][gene]
primary_transcript = transcript
else:
gene = None
#Check if there is a location in the question.
question_locations = consts['mut_regexp'].search(question)
if question_locations:
question_locations_d = question_locations.groupdict()
location, change = get_location_from_regexp_match(question_locations_d)
# Search for transcripts
found, transcripts_found = search_transcripts(**location)
if not found: # rs58991260
to_print.append('Position of : {} does not belong in any transcript'.format(question_locations.group(0)))
to_print.append('It belongs in between these transcripts')
to_print.append(' '.join(transcripts_found))
else: #rs262656
to_print.append('Found the following transcripts:')
for x in transcripts_found:
to_print.append(' {} --> Gene:{} appris: {}'.format(x, transcripts[x]['gene'], transcripts[x]['appris']))
primary_transcript = take_best_appris(transcripts_found)
gene = [transcripts[x]['gene'] for x in transcripts_found if x==primary_transcript][0]
transcript = primary_transcript
if location:
to_print.append('Chromosome: {chromosome} Position: {position}'.format(**location))
# Check # myvariant(location={'chromosome':9, 'position':107620835 }, task="fetch_variants_from_location")
# Get all HGVS and rs from this location
myvariant_variants = myvariant(location=copy.copy(location), task="fetch_variants_from_location")
to_print.append('Found variants on this location:')
for x in myvariant_variants:
to_print.append(' {variant} --> {rs}'.format(**x))
position_from = max(0, int(location['position'])-50)
position_to = int(location['position']) + 50
sequence = ensembl_region(location['chromosome'], position_from, position_to)
to_print.append('Reference Sequence: {} FROM:{}(-50) TO: {}(+50)'.format(location['chromosome'], position_from, position_to))
to_print.append(sequence)
if location and change:
to_print.append(myvariant(variant={
'chromosome':location['chromosome'],
'position': location['position'],
'change':change}, task="fetch_info_from_variant"))
if gene:
to_print.append('Gene: {}'.format(gene))
gene_info = mygene(gene, 'get_info')
to_print.append('Gene name: {}'.format(gene_info['name']))
to_print.append('Type of gene: {}'.format(gene_info['type_of_gene']))
to_print.append('Other Gene names:')
to_print.append(gene_info['other_names'])
to_print.append('Summary:')
to_print.append(gene_info['summary'])
to_print.append('Gene location information:')
to_print.append(gene_info['location'])
to_print.append('Refseq Translation:')
to_print.append(gene_info['refseq'])
to_print.append('Pathways:')
to_print.append(gene_info['pathway'])
to_print.append('This gene also exists on the following organisms:')
to_print.append(gene_info['taxonomy'])
if primary_transcript:
to_print.append('Primary Transcript: {}'.format(primary_transcript))
#if any((x in question for x in ['upstream', 'downstream', 'closer', 'closest'])) and gene:
if gene:
location_index = consts['genes_ordered_keys'].index(gene)
decorate_with_stars = lambda x,right : '*' + x + '*' if x==right else x
docorate_with_positions = lambda i : int_commas(genes_ordered_values[i][1]) + ' ' + decorate_with_stars(genes_ordered_keys[i], gene) + ' ' + int_commas(genes_ordered_values[i][2])
f = max(0, location_index-3)
t = min(len(consts['genes']), location_index+3)
to_show = [docorate_with_positions(i) for i in range(f,t+1)]
to_print.append('Downstream / Upstream genes:')
to_print.append((' '*4) + ' --> '.join(to_show))
if 'transcripts' in question and gene:
to_print.append('Transcripts of gene: {}'.format(gene))
tr = [k for k,v in transcripts.items() if v['gene'] == gene]
for i, x in enumerate(tr):
if x == primary_transcript:
to_print.append('{} {} *PRIMARY*'.format(i+1, x))
else:
to_print.append('{} {}'.format(i+1, x))
if transcript:
to_print.append('Isoform map of {}:'.format(transcript))
to_print.append(isoform_map(transcript, position = location['position'] if location else None))
print('\n'.join(to_print))
#==================================================
# MyVariant.info
# Example of result from variant.infgo from rs58991260
'''
{
"max_score": 15.177358,
"took": 3,
"total": 1,
"hits": [
{
"_id": "chr1:g.218631822G>A",
"_score": 15.177358,
"dbsnp": {
"_license": "https://goo.gl/Ztr5rl",
"allele_origin": "unspecified",
"alleles": [
{
"allele": "G",
"freq": 0.9784
},
{
"allele": "A",
"freq": 0.02157
}
],
"alt": "A",
"chrom": "1",
"class": "SNV",
"dbsnp_build": 129,
"flags": [
"ASP",
"G5",
"GNO",
"KGPhase1",
"KGPhase3",
"SLO"
],
"gmaf": 0.02157,
"hg19": {
"end": 218631822,
"start": 218631822
},
"ref": "G",
"rsid": "rs58991260",
"validated": false,
"var_subtype": "ts",
"vartype": "snp"
}
}
]
}
'''
def json_pprint(j):
print (json.dumps(j, indent=4))
def myvariant(variant=None, location=None, task=None):
'''
variant can be rs..
http://myvariant.info/v1/variant/chr9:g.107620835G>A
http://myvariant.info/v1/variant/chr9:g.107620835G>A?fields=dbsnp.rsid,dbsnp.vartype
http://myvariant.info/v1/query?q=chr9:107620835&fields=gnomad_genome.af,exac.af,dbnsfp.1000gp3.af
http://myvariant.info/v1/query?q=rs58991260
http://myvariant.info/v1/query?q=chr9:107620835&fields=_id,dbsnp.rsid
myvariant(location={'chromosome':9, 'position':107620835 }, task="fetch_variants_from_location")
myvariant(variant={'chromosome':19, 'position': 44908684, 'change':'T>C'}, task="fetch_info_from_variant")
'''
def lift_hgvs(hgvs):
s = re.search(r'chr([\d+XY]+):g\.([\d]+)([ACGT]>[ACGT])', hgvs)
p = lift(chromosome=s.group(1), position=int(s.group(2)))
return 'chr{}:g.{}{}'.format(s.group(1), p, s.group(3))
if task == 'fetch_location_of_dbsnp':
url = 'http://myvariant.info/v1/query'
parameters = {
'q': variant,
'fields': 'dbsnp',
#'hg38': 'true', # Makes no differnce
}
elif task == 'fetch_info_from_variant':
variant['position'] = lift(variant['chromosome'], variant['position'], inverse=True)
this_variant = 'chr{chromosome}:g.{position}{change}'.format(**variant)
url = f'http://myvariant.info/v1/variant/{this_variant}'
parameters = {
'fields': 'gnomad_genome.af,exac.af,dbnsfp.1000gp3.af,clinvar,snpeff.ann,cadd.polyphen,cadd.sift.cat'
}
elif task == 'fetch_variants_from_location':
url = 'http://myvariant.info/v1/query'
location['position'] = lift(location['chromosome'], location['position'], inverse=True)
parameters = {
'q': 'chr{chromosome}:{position}'.format(**location),
'fields': '_id,dbsnp.rsid',
#'hg38': 'true', # No effect ??
}
else:
raise Exception('Unknown task: {}'.format(task))
r = requests.get(url, params=parameters)
if not r.ok:
r.raise_for_status()
j = r.json()
#json_pprint(j)
if task == 'fetch_location_of_dbsnp':
chromosome = j['hits'][0]["dbsnp"]["chrom"]
position_19 = j['hits'][0]["dbsnp"]["hg19"]["start"]
position_38 = lift(chromosome, position_19)
change = '{}>{}'.format(j['hits'][0]['dbsnp']['ref'], j['hits'][0]['dbsnp']['alt'])
return {'chromosome': chromosome, 'position': position_38}, change
elif task == 'fetch_info_from_variant':
ret = []
if 'dbnsfp' in j:
if '1000gp3' in j['dbnsfp']:
if 'af' in j['dbnsfp']['1000gp3']:
ret.append('1000 Genomes Allele Frequency: {}'.format(j['dbnsfp']['1000gp3']['af']))
if 'exac' in j:
if 'af' in j['exac']:
ret.append('ExAC Allele Frequency: {}'.format(j['exac']['af']))
if 'gnomad_genome' in j:
if 'af' in j['gnomad_genome']['af']:
if 'af' in j['gnomad_genome']['af']:
ret.append('gnomAD Allele Frequency: {}'.format(j['gnomad_genome']['af']['af']))
if 'clinvar' in j:
if 'rcv' in j['clinvar']:
ret.append('Clinvar:')
ret.extend([' {} --> {} --> {} '.format(x['clinical_significance'], x['conditions']['name'], x['review_status']) for x in j['clinvar']['rcv']])
if 'snpeff' in j:
if 'ann' in j['snpeff']:
ret.append('snpeff:')
if type(j['snpeff']['ann']) is list:
#json_pprint(j['snpeff']['ann'])
ret.extend([' {}:{} --> {}'.format(x['feature_id'], x['hgvs_c'], x['effect']) for x in j['snpeff']['ann']])
else:
if 'effect' in j['snpeff']['ann']:
ret.append(' {}:{} --> {}'.format(j['snpeff']['ann']['feature_id'], j['snpeff']['ann']['hgvs_c'], j['snpeff']['ann']['effect']))
if 'cadd' in j:
if 'polyphen' in j['cadd']:
if 'cat' in j['cadd']['polyphen']:
ret.append('polyphen category: {}'.format(j['cadd']['polyphen']['cat']))
if 'sift' in j['cadd']:
if 'cat' in j['cadd']['sift']:
ret.append('SIFT category: {}'.format(j['cadd']['sift']['cat']))
ret = '\n'.join(ret)
return ret
elif task == 'fetch_variants_from_location':
# http://myvariant.info/v1/query?q=chr9:107620835&fields=_id,dbsnp.rsid
return [{'variant': lift_hgvs(x['_id']), 'rs': x.get('dbsnp', {'rsid': None})['rsid']} for x in j['hits']]
else:
raise Exception('Unknown task: {}'.format(task))
#================End of MyVariant.info #############
#==================================================
#==============VEP =====================
'''
[
{
"id": "AGT:c.803T>C",
"seq_region_name": "1",
"strand": -1,
"colocated_variants": [
{
"start": "230710048",
"strand": "1",
"seq_region_name": "1",
"end": "230710048",
"allele_string": "HGMD_MUTATION",
"phenotype_or_disease": "1",
"id": "CM920010"
},
{
"strand": "1",
"seq_region_name": "1",
"start": "230710048",
"minor_allele": "A",
"phenotype_or_disease": "1",
"minor_allele_freq": "0.2949",
"frequencies": {
"C": {
"gnomad_sas": "0.6204",
"eas": "0.8532",
"amr": "0.6354",
"gnomad_nfe": "0.4202",
"gnomad_asj": "0.4391",
"gnomad_oth": "0.5164",
"aa": "0.8268",
"gnomad_amr": "0.7185",
"gnomad_fin": "0.4394",
"afr": "0.9032",
"ea": "0.4258",
"sas": "0.636",
"gnomad": "0.5464",
"gnomad_afr": "0.8485",
"gnomad_eas": "0.8378",
"eur": "0.4115"
}
},
"clin_sig": [
"benign",
"risk_factor"
],
"id": "rs699",
"end": "230710048",
"pubmed": [
"21988197",
"9831339",
"20486282",
"21681796"
],
"allele_string": "A/G"
}
],
"most_severe_consequence": "missense_variant",
"input": "AGT:c.803T>C",
"allele_string": "T/C",
"transcript_consequences": [
{
"cds_start": 803,
"polyphen_prediction": "benign",
"sift_prediction": "tolerated",
"sift_score": 1,
"gene_symbol": "AGT",
"biotype": "protein_coding",
"gene_id": "ENSG00000135744",
"protein_end": 268,
"cdna_start": 1018,
"polyphen_score": 0,
"hgnc_id": "HGNC:333",
"amino_acids": "M/T",
"consequence_terms": [
"missense_variant"
],
"codons": "aTg/aCg",
"protein_start": 268,
"variant_allele": "C",
"cds_end": 803,
"impact": "MODERATE",
"strand": -1,
"transcript_id": "ENST00000366667",
"gene_symbol_source": "HGNC",
"cdna_end": 1018
},
{
"variant_allele": "C",
"transcript_id": "ENST00000412344",
"gene_id": "ENSG00000244137",
"strand": -1,
"impact": "MODIFIER",
"distance": 650,
"gene_symbol_source": "Clone_based_ensembl_gene",
"gene_symbol": "AL512328.1",
"consequence_terms": [
"downstream_gene_variant"
],
"biotype": "antisense"
}
],
"end": 230710048,
"assembly_name": "GRCh38",
"start": 230710048
}
]
'''
def vep(variant, task):
'''
https://rest.ensembl.org/documentation/info/vep_hgvs_get
AGT:c.803T>C
'''
server = "https://rest.ensembl.org"
ext = "/vep/human/hgvs/{}?".format(variant)
parameters = {
"content-type": "application/json"
}
r = requests.get(server+ext, headers={ "Content-Type" : "application/json"}, params=parameters)
if not r.ok:
r.raise_for_status()
decoded = r.json()
#json_pprint(decoded)
chromosome = decoded[0]['seq_region_name']
position = int(decoded[0]['start'])
change = decoded[0]['allele_string']
if task == 'fetch_location_from_HGVS':
location = {'chromosome': chromosome, 'position': position}
return location, change
else:
raise Exception('Uknown task: {}'.format(task))
#==========END OF VEP ==================
### ensembl region ###
def ensembl_region(chromosome, f, t):
'''
ensembl_region('X', 1000000, 1000100)
https://rest.ensembl.org/sequence/region/human/X:1000000..1000100:1?content-type=text/plain
'''
if t<=f:
raise Exception('from should be greater than to')
url = f'https://rest.ensembl.org/sequence/region/human/{chromosome}:{f}..{t}:1'
parameters = {
'content-type' : 'text/plain',
}
r = requests.get(url, params=parameters)
if not r.ok:
r.raise_for_status()
t = r.text
return t
### End of ensebl region
########### mutalyzer ##############
def mutalyzer(variant, task):
'''
https://mutalyzer.nl/soap-api
https://mutalyzer.nl/json/runMutalyzer?variant=NM_003193.4:c.464T%3EA
NM_003193.4:c.464T>A
NM_003002.3:c.274G>T
https://mutalyzer.nl/json/numberConversion?variant=NM_003002.3:c.274G%3ET&build=hg38
'''
url='https://mutalyzer.nl/json/numberConversion'
parameters = {
"variant" : variant,
"build" : "hg38",
}
r = requests.get(url, params=parameters)
if not r.ok:
r.raise_for_status()
decoded = r.json() # NC_000011.10:g.112088971G>T
m = re.search(r'NC_([\d]+)\.[\d]+:g\.([\d]+)([ACGT]>[ACGT])', decoded[0])
chromosome = int(m.group(1))
position = int(m.group(2))
change = m.group(3)
location = {'chromosome': chromosome, 'position': position}
return location, change
########### end of mutalyzer ########
############ mygene.info #######################
def mygene_taxonomy(gene):
tax_names = {
9606 : ('human', 'Homo sapiens'),
10090 : ('mouse', 'Mus musculus'),
10116 : ('rat', 'Rattus norvegicus'),
7227 : ('fruitfly', 'Drosophila melanogaster'),
6239 : ('nematode', 'Caenorhabditis elegans'),
7955 : ('zebrafish', 'Danio rerio'),
3702 : ('thale-cress', 'Arabidopsis thaliana'),
8364 : ('frog', 'Xenopus tropicalis'),
9823 : ('pig', 'Sus scrofa'),
}
url = 'http://mygene.info/v3/query'
parameters = {
'q': "symbol:{}".format(gene),
'fields': 'taxid'
}
r = requests.get(url, params = parameters)
if not r.ok:
raise r.raise_for_status()
decoded = r.json()
#json_pprint(decoded)
tax = list(filter(bool, [tax_names.get(int(x['taxid']), False) for x in decoded['hits']]))
if tax:
ret = '\n'.join([' {} --> {}'.format(*x) for x in tax])
return ret
return ''
def mygene(gene, task):
'''
By default is in hg38
All fields
http://docs.mygene.info/en/latest/doc/data.html
http://mygene.info/v3/query?q=symbol:tpmt&fields=name&species=human
Test with: mygene('TPMT', 'get_names')
'''
def is_mygene_chromosome(x):
'''
Exclude things like: "chr": "CHR_HSCHR1_5_CTG32_1"
'''
return x.upper() in list(map(str, range(1,23))) + ['X', 'Y']
url = 'http://mygene.info/v3/query'
if task == 'get_info':
fields = 'name,other_names,map_location,genomic_pos,refseq,pathway,summary,type_of_gene'
else:
raise Exception('Unknown task for mygene: {}'.format(task))
parameters = {
'q': gene,
'species' : 'human',
'fields': fields,
#'hg38': 'true',
}
r = requests.get(url, params=parameters)
if not r.ok:
r.raise_for_status()
decoded = r.json()
#json_pprint(decoded)
first_hit = decoded['hits'][0]
if type(first_hit['genomic_pos']) is list:
chromosomes = {x['chr'] for x in first_hit['genomic_pos'] if is_mygene_chromosome(x['chr'])}
assert len(chromosomes) == 1 # Can it have more than in chromosomes?
chromosome = list(chromosomes)[0]
location = '\n'.join(["Ensemble Gene: {ensemblgene} Chromosome: {chr} Start: {start} End:{end}".format(**x) for x in first_hit['genomic_pos'] if x['chr'] == chromosome])
else:
chromosome = first_hit['genomic_pos']['chr']
location = "Ensemble Gene: {ensemblgene} Chromosome: {chr} Start: {start} End:{end}".format(**first_hit['genomic_pos'])
# Create refseq
refseq = '\n'.join([" {rna} --> {protein} ".format(**x) for x in first_hit['refseq']['translation']])
# Pathway
pathway = ''
for pathway_db in ['kegg', 'pharmgkb', 'reactome', 'smpdb', 'wikipathways']:
if pathway_db in first_hit['pathway']:
#print (pathway_db, first_hit['pathway'][pathway_db])
pathway += pathway_db + '\n'
if type(first_hit['pathway'][pathway_db]) is list:
pathway += '\n'.join([" {id} --> {name}".format(**x) for x in first_hit['pathway'][pathway_db]])
else:
pathway += " {id} --> {name}".format(**first_hit['pathway'][pathway_db])
ret = {
'name' : first_hit['name'],
'other_names' : '\n'.join(map(lambda x: ' ' + x, first_hit['other_names'])),
'map_location': first_hit['map_location'],
'location' : location,
'refseq' : refseq,
'pathway': pathway,
'summary': first_hit['summary'],
'taxonomy': mygene_taxonomy(gene),
'type_of_gene': first_hit['type_of_gene']
}
return ret
############ end of mygene.info ################
def report_errors():
if not consts['errors']:
return
print ('Errors:')
for k,v in consts['errors'].items():
print (consts['messages'][k],v)
if __name__ == '__main__':
'''
rs429358 . APOE
'''
configure_regexp()
parse_gff3(consts['gencode'])
index_transcripts()
index_genes()
parser = argparse.ArgumentParser(description='The best python project ever')
parser.add_argument('-q', '--question', help='Your question in english', required=False)
args = parser.parse_args()
question = args.question
if question:
answer(question)
report_errors()
#test_isoform_map()
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 131,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Start parsing: gencode.v29.annotation.gff3.gz\n",
"Lines: 10000\n",
"Lines: 20000\n",
"Lines: 30000\n",
"Lines: 40000\n",
"Lines: 50000\n",
"Lines: 60000\n",
"Lines: 70000\n",
"Lines: 80000\n",
"Lines: 90000\n",
"Lines: 100000\n",
"Lines: 110000\n",
"Lines: 120000\n",
"Lines: 130000\n",
"Lines: 140000\n",
"Lines: 150000\n",
"Lines: 160000\n",
"Lines: 170000\n",
"Lines: 180000\n",
"Lines: 190000\n",
"Lines: 200000\n",
"Lines: 210000\n",
"Lines: 220000\n",
"Lines: 230000\n",
"Lines: 240000\n",
"Lines: 250000\n",
"Done parsing: gencode.v29.annotation.gff3.gz\n"
]
}
],
"source": [
"import re\n",
"import gzip\n",
"from collections import defaultdict\n",
"\n",
"consts = {\n",
" 'gencode': 'gencode.v29.annotation.gff3.gz',\n",
" 'transcripts': {}, # Transcript data\n",
"}\n",
"\n",
"\n",
"\n",
"def parse_gff3_line(l):\n",
" ls = l.split()\n",
" \n",
" #Read chromsome \n",
" chromosome = ls[0]\n",
" # Make sure that chromosome is 'chr\\d+'\n",
" assert re.match(r'chr[\\dXY]+', chromosome)\n",
" \n",
" #Annotation source\n",
" source= ls[1]\n",
" #Make sure that this is either 'HAVANA' or 'ENSEMBL'\n",
" assert source in ['HAVANA', 'ENSEMBL']\n",
" \n",
" #Type of the regulatory element (gene, transcript, exon, ...) \n",
" t_type = ls[2]\n",
" if t_type in ['stop_codon_redefined_as_selenocysteine']:\n",
" return None\n",
" \n",
" #Read start and end\n",
" start = int(ls[3])\n",
" end = int(ls[4])\n",
" \n",
" #Read annotation\n",
" #For example: ID=exon:ENST00000450305.2:1;Parent=ENST00000450305.2;gene_id=ENSG00000223972.5 ..\n",
" annotation = {key:value for key,value in [x.split('=') for x in ls[8].split(';')]}\n",
" \n",
" return {\n",
" 'chromosome': chromosome,\n",
" 'source': source,\n",
" 'type': t_type,\n",
" 'start': start,\n",
" 'end': end,\n",
" 'annotation': annotation\n",
" }\n",
" \n",
"def decode_appris(appris):\n",
" '''\n",
" Return a integer representation of appris\n",
" The lower the better.\n",
" '''\n",
" \n",
" return {\n",
" 'appris_principal_1': 0,\n",
" 'appris_principal_2': 1,\n",
" 'appris_principal_3': 2,\n",
" 'appris_principal_4': 3,\n",
" 'appris_principal_5': 4,\n",
" 'appris_alternative_1':5,\n",
" 'appris_alternative_2':6,\n",
" }[appris]\n",
"\n",
"def decode_chromosome(chromosome):\n",
" '''\n",
" Return an int representation of a chromosome\n",
" '''\n",
" m = re.match(r'chr([\\d]+)', chromosome)\n",
" if m:\n",
" return int(m.group(1))\n",
" \n",
" raise Exception('Cannot decode chromosome: {}'.format(chromosome))\n",
"\n",
"def store_transcript_data(gff3_data):\n",
" \n",
" chromosome = decode_chromosome(gff3_data['chromosome'])\n",
" \n",
" annotations = gff3_data['annotation']\n",
" transcript_id = annotations['transcript_id']\n",
" gene = annotations['gene_name']\n",
" t_type = gff3_data['type']\n",
"\n",
" start = gff3_data['start']\n",
" end = gff3_data['end']\n",
" \n",
" exon_number = int(annotations.get('exon_number', -1))\n",
" #Check that exon number makes sense...\n",
" if t_type in ['CDS', 'exon', 'start_codon', 'stop_codon', 'three_prime_UTR', 'five_prime_UTR']:\n",
" assert exon_number>0\n",
" location_tuple = (start, end, exon_number)\n",
" \n",
" \n",
" #Read appris information\n",
" appris = [x for x in annotations['tag'].split(',') if 'appris' in x]\n",
" assert len(appris) == 1 # Cannot have more than one appries..\n",
" appris = appris[0]\n",
" \n",
" #print (transcript_id)\n",
" \n",
" transcripts = consts['transcripts']\n",
" \n",
" if not transcript_id in transcripts:\n",
" transcripts[transcript_id] = {\n",
" 'gene': gene,\n",
" 'chr': chromosome,\n",
" 'start': None,\n",
" 'end': None,\n",
" 'exons': [],\n",
" 'cds': [],\n",
" '5utr': [],\n",
" '3utr': [],\n",
" 'start_c': [], # Start codon\n",
" 'stop_c': [], # Stop codon\n",
" 'appris': None,\n",
" }\n",
" \n",
" transcript_info = transcripts[transcript_id]\n",
" \n",
" #Some quality control\n",
" assert transcript_info['gene'] == gene\n",
" \n",
" #Store transcript information in relevant fields\n",
" if t_type == 'transcript':\n",
" transcript_info['appris'] = decode_appris(appris)\n",
" assert transcript_info['start'] is None\n",
" assert transcript_info['end'] is None\n",
" transcript_info['start'] = start\n",
" transcript_info['end'] = end\n",
" elif t_type == 'exon':\n",
" transcript_info['exons'].append(location_tuple)\n",
" elif t_type == 'CDS':\n",
" transcript_info['cds'].append(location_tuple)\n",
" elif t_type == 'start_codon':\n",
" #assert transcript_info['start_c'] is None # Cannot have two start codons.. Yet.. some.. have\n",
" transcript_info['start_c'].append(location_tuple)\n",
" elif t_type == 'stop_codon':\n",
" #assert transcript_info['stop_c'] is None # Cannot have two stop codons..\n",
" transcript_info['stop_c'].append(location_tuple)\n",
" elif t_type == 'five_prime_UTR':\n",
" transcript_info['5utr'].append(location_tuple)\n",
" elif t_type == 'three_prime_UTR':\n",
" transcript_info['3utr'].append(location_tuple) \n",
" else:\n",
" raise Exception('Unknown regulatory element: {}'.format(t_type))\n",
" \n",
"\n",
"def parse_gff3(fn):\n",
" '''\n",
" Parse gff3 file\n",
" '''\n",
" print ('Start parsing:', fn)\n",
" with gzip.open(fn, 'rt') as f:\n",
" line_counter = 0\n",
" for l in f:\n",
" line_counter += 1\n",
" \n",
" if line_counter % 10000 == 0:\n",
" print ('Lines:', line_counter)\n",
" \n",
" if l[0] == '#': # This is a comment\n",
" continue\n",
" \n",
" if not 'appris' in l: # Ignore transcripts without appris information\n",
" continue\n",
"\n",
" gff3_data = parse_gff3_line(l)\n",
" if not gff3_data:\n",
" continue\n",
" store_transcript_data(gff3_data)\n",
" \n",
"\n",
" #if line_counter > 10000:\n",
" # break\n",
" if gff3_data['chromosome'] != 'chr1':\n",
" break\n",
" print ('Done parsing:', fn)\n",
"\n",
"\n",
"parse_gff3(consts['gencode'])\n"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['ENST00000335137.4',\n",
" 'ENST00000426406.3',\n",
" 'ENST00000332831.4',\n",
" 'ENST00000342066.7',\n",
" 'ENST00000618181.4',\n",
" 'ENST00000622503.4',\n",
" 'ENST00000618779.4',\n",
" 'ENST00000616125.4',\n",
" 'ENST00000617307.4',\n",
" 'ENST00000327044.6',\n",
" 'ENST00000338591.7',\n",
" 'ENST00000379410.7',\n",
" 'ENST00000379409.6',\n",
" 'ENST00000379407.7',\n",
" 'ENST00000341290.6',\n",
" 'ENST00000433179.3',\n",
" 'ENST00000304952.10',\n",
" 'ENST00000624697.4',\n",
" 'ENST00000379389.4',\n",
" 'ENST00000649529.1',\n",
" 'ENST00000379370.6',\n",
" 'ENST00000453464.2',\n",
" 'ENST00000379339.5',\n",
" 'ENST00000421241.6',\n",
" 'ENST00000379320.5',\n",
" 'ENST00000379319.5',\n",
" 'ENST00000379290.5',\n",
" 'ENST00000379289.5',\n",
" 'ENST00000328596.10',\n",
" 'ENST00000379268.6',\n",
" 'ENST00000486728.5',\n",
" 'ENST00000379265.5',\n",
" 'ENST00000379236.3',\n",
" 'ENST00000360001.10',\n",
" 'ENST00000263741.11',\n",
" 'ENST00000647651.1',\n",
" 'ENST00000379198.4',\n",
" 'ENST00000330388.2',\n",
" 'ENST00000349431.10',\n",
" 'ENST00000360466.6',\n",
" 'ENST00000379116.9',\n",
" 'ENST00000338555.6',\n",
" 'ENST00000400928.7',\n",
" 'ENST00000325425.12',\n",
" 'ENST00000354700.9',\n",
" 'ENST00000379031.9',\n",
" 'ENST00000435064.5',\n",
" 'ENST00000343938.8',\n",
" 'ENST00000339381.5',\n",
" 'ENST00000378891.9',\n",
" 'ENST00000378888.9',\n",
" 'ENST00000309212.10',\n",
" 'ENST00000338370.7',\n",
" 'ENST00000338338.9',\n",
" 'ENST00000321751.9',\n",
" 'ENST00000378853.3',\n",
" 'ENST00000400809.7',\n",
" 'ENST00000344843.11',\n",
" 'ENST00000537107.5',\n",
" 'ENST00000454272.2',\n",
" 'ENST00000378821.3',\n",
" 'ENST00000476993.1',\n",
" 'ENST00000378785.6',\n",
" 'ENST00000308647.7',\n",
" 'ENST00000378756.7',\n",
" 'ENST00000378755.9',\n",
" 'ENST00000536055.5',\n",
" 'ENST00000378733.8',\n",
" 'ENST00000425828.1',\n",
" 'ENST00000291386.3',\n",
" 'ENST00000422725.3',\n",
" 'ENST00000520777.5',\n",
" 'ENST00000355826.9',\n",
" 'ENST00000505820.6',\n",
" 'ENST00000378708.5',\n",
" 'ENST00000356026.9',\n",
" 'ENST00000341832.10',\n",
" 'ENST00000407249.7',\n",
" 'ENST00000340677.9',\n",
" 'ENST00000629289.2',\n",
" 'ENST00000626918.2',\n",
" 'ENST00000629312.2',\n",
" 'ENST00000617444.4',\n",
" 'ENST00000611123.1',\n",
" 'ENST00000356200.7',\n",
" 'ENST00000378638.6',\n",
" 'ENST00000404249.7',\n",
" 'ENST00000357760.6',\n",
" 'ENST00000358779.9',\n",
" 'ENST00000378633.5',\n",
" 'ENST00000355439.7',\n",
" 'ENST00000643905.1',\n",
" 'ENST00000341426.9',\n",
" 'ENST00000341991.7',\n",
" 'ENST00000378625.5',\n",
" 'ENST00000342348.9',\n",
" 'ENST00000610897.4',\n",
" 'ENST00000378609.8',\n",
" 'ENST00000307786.7',\n",
" 'ENST00000378604.3',\n",
" 'ENST00000378602.3',\n",
" 'ENST00000310991.7',\n",
" 'ENST00000493964.5',\n",
" 'ENST00000378585.6',\n",
" 'ENST00000640067.1',\n",
" 'ENST00000638771.1',\n",
" 'ENST00000378567.7',\n",
" 'ENST00000378546.8',\n",
" 'ENST00000378536.4',\n",
" 'ENST00000378531.7',\n",
" 'ENST00000378529.7',\n",
" 'ENST00000605895.5',\n",
" 'ENST00000488353.2',\n",
" 'ENST00000447513.6',\n",
" 'ENST00000507596.5',\n",
" 'ENST00000449969.5',\n",
" 'ENST00000419816.6',\n",
" 'ENST00000378486.7',\n",
" 'ENST00000378466.7',\n",
" 'ENST00000378453.3',\n",
" 'ENST00000409119.5',\n",
" 'ENST00000355716.4',\n",
" 'ENST00000419916.7',\n",
" 'ENST00000378412.7',\n",
" 'ENST00000401095.8']"
]
},
"execution_count": 42,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(consts['transcripts'].keys())"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'gene': 'PERM1',\n",
" 'chr': 1,\n",
" 'start': 975205,\n",
" 'end': 981029,\n",
" 'exons': [(978881, 981029, 1), (976499, 976624, 2), (975205, 976269, 3)],\n",
" 'cds': [(978881, 981029, 1), (976499, 976624, 2), (976172, 976269, 3)],\n",
" '5utr': [],\n",
" '3utr': [(975205, 976171, 3)],\n",
" 'start_c': (981027, 981029, 1),\n",
" 'stop_c': (976172, 976174, 3),\n",
" 'appris': 1}"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"consts['transcripts']['ENST00000433179.3']"
]
},
{
"cell_type": "code",
"execution_count": 158,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Generating all possible isoforms!\n",
"Isoforms: 0\n",
"Isoforms: 100\n",
"Isoforms: 200\n",
"Isoforms: 300\n",
"Isoforms: 400\n",
"Isoforms: 500\n",
"Isoforms: 600\n",
"Isoforms: 700\n",
"Isoforms: 800\n",
"Isoforms: 900\n",
"Isoforms: 1000\n",
"Isoforms: 1100\n",
"Isoforms: 1200\n",
"Isoforms: 1300\n",
"Isoforms: 1400\n",
"Isoforms: 1500\n",
"Isoforms: 1600\n",
"Isoforms: 1700\n",
"Isoforms: 1800\n",
"Isoforms: 1900\n",
"Isoforms: 2000\n",
"Isoforms: 2100\n",
"Isoforms: 2200\n",
"Isoforms: 2300\n",
"Isoforms: 2400\n",
"Isoforms: 2500\n",
"Isoforms: 2600\n",
"Isoforms: 2700\n",
"Isoforms: 2800\n",
"Isoforms: 2900\n",
"Isoforms: 3000\n",
"Isoforms: 3100\n",
"Isoforms: 3200\n",
"Isoforms: 3300\n",
"Isoforms: 3400\n",
"Isoforms: 3500\n",
"Isoforms: 3600\n",
"Testing done. Total: 3673\n"
]
}
],
"source": [
"def space_check(space_1, space_2):\n",
" '''\n",
" Returns True if space1 is BEFORE space_2\n",
" Returns False is space1 is AFTER space_2\n",
" Throws excpetion in case of overlap\n",
" '''\n",
" \n",
" if space_1[0] <= space_1[1] <= space_2[0] <= space_2[1]:\n",
" return True\n",
" \n",
" if space_2[0] <= space_2[0] and space_1[0] <= space_1[1]:\n",
" return False\n",
" \n",
" raise Exception('Space overlap: {} {}'.format(space_1, space_2))\n",
"\n",
"def exon_map(exon, cds, utr_3, utr_5):\n",
" '''\n",
" Return a box for each exon\n",
" Return the positions of the edges of the boxes\n",
" '''\n",
" exon_number = exon[2]\n",
" if utr_3 is None and utr_5 is None:\n",
" assert exon == cds # If not utrs present exon and cds should coincide\n",
" return exon_str_1(exon_number, 'cds') , (exon[0], exon[1])\n",
" \n",
" elif cds is None and utr_5 is None:\n",
" assert exon == utr_3\n",
" return exon_str_1(exon_number, 'utr3'), (exon[0], exon[1])\n",
" \n",
" elif cds is None and utr_3 is None:\n",
" assert exon == utr_5\n",
" return exon_str_1(exon_number, 'utr5'), (exon[0], exon[1])\n",
" \n",
" elif utr_3 is None:\n",
" assert utr_5 # Redundant but ok..\n",
" #Which is first, CDS or UTR5?\n",
" s_check = space_check(cds, utr_5)\n",
" if s_check:\n",
" assert exon[0] == cds[0]\n",
" assert cds[1] == utr_5[0] - 1\n",
" assert exon[1] == utr_5[1]\n",
" return exon_str_2(exon_number, 'cds', 'utr5'), (exon[0], cds[1], exon[1])\n",
" else:\n",
" assert exon[0] == utr_5[0]\n",
" assert utr_5[1] == cds[0] - 1\n",
" assert exon[1] == cds[1]\n",
" return exon_str_2(exon_number, 'utr5', 'cds'), (exon[0], utr_5[1], exon[1])\n",
" elif utr_5 is None:\n",
" assert utr_3 # Redundancy is bliss..\n",
" s_check = space_check(cds, utr_3)\n",
" if s_check:\n",
" assert exon[0] == cds[0]\n",
" assert cds[1] == utr_3[0] - 1\n",
" assert exon[1] == utr_3[1]\n",
" return exon_str_2(exon_number, 'cds', 'utr3'), (exon[0], cds[0], exon[1])\n",
" else:\n",
" assert exon[0] == utr_3[0]\n",
" assert utr_3[1] == cds[0] - 1\n",
" assert exon[1] == cds[1]\n",
" return exon_str_2(exon_number, 'utr3', 'cds'), (exon[0], utr_3[1], exon[1])\n",
" else:\n",
" assert cds\n",
" assert utr_3\n",
" assert utr_5\n",
" # 69055, 70108, 1) (69091, 70008, 1) (70009, 70108, 1) (69055, 69090, 1)\n",
" # Which is first, utr_3 or utr_5 ?\n",
" s_check = space_check(utr_3, utr_5)\n",
" if s_check:\n",
" assert utr_3[0] <= utr_3[1] <= cds[0] <= cds[1] <= utr_5[0] <= utr_5[1]\n",
" return exon_str_3(exon_number, 'utr3', 'cds', 'utr5'), (exon[0], cds[0], utr_5[0], exon[1])\n",
" else:\n",
" assert utr_5[0] <= utr_5[1] <= cds[0] <= cds[1] <= utr_3[0] <= utr_3[1]\n",
" return exon_str_3(exon_number, 'utr5', 'cds', 'utr3'), (exon[0], cds[0], utr_3[0], exon[1])\n",
" \n",
" raise Exception('This should never happen..')\n",
"\n",
"\n",
"def isoform_map(transcript):\n",
" tr_data = consts['transcripts'][transcript]\n",
" \n",
" def element_getter(element, exon_number):\n",
" ret = [x for x in tr_data[element] if x[2]==exon_number]\n",
" if len(ret) > 1:\n",
" raise Exception('Transcript: {} exon_number: {} has more than one: {}'.\n",
" format(transcript, exon_number, element))\n",
" \n",
" if ret:\n",
" return ret[0]\n",
" return None # Do not have do.. but always nice and readable\n",
" \n",
" #Order exons accorsing to position\n",
" exons_ordered = sorted(tr_data['exons'])\n",
" \n",
" boxes = []\n",
" positions = []\n",
" for exon in exons_ordered:\n",
" exon_number = exon[2]\n",
" #Get all elements of the exon\n",
" cds = element_getter('cds', exon_number)\n",
" utr_3 = element_getter('3utr', exon_number)\n",
" utr_5 = element_getter('5utr', exon_number)\n",
" \n",
" if boxes:\n",
" boxes.append(exon_join())\n",
" exon_box, exon_positions = exon_map(exon, cds, utr_3, utr_5)\n",
" boxes.append(exon_box)\n",
" positions.append(exon_positions)\n",
" \n",
" \n",
" boxes = [x.split('\\n')[1:] for x in boxes] # Remove first line\n",
" assert len({len(x) for x in boxes}) <= 1 # Each \"box\" should have the same number of lines\n",
" boxes_str = '\\n'.join([' '+''.join(x) for x in zip(*boxes)]) # \"Zip\" boxes together\n",
"# print (boxes_str)\n",
" boxes_str = boxes_str[:-5] # Remove enter at the end of boxes\n",
" \n",
" positions_str = '' + ' '.join([format_pos(y) for x in positions for y in x])\n",
"# print (positions_str)\n",
"\n",
" complete_str = boxes_str + positions_str\n",
" return complete_str\n",
"\n",
" #print (tr_data)\n",
" \n",
"#isoform_map('ENST00000433179.3')\n",
"\n",
"def test_isoform_map():\n",
" '''\n",
" For all transcripts check isoform maps\n",
" '''\n",
" print ('Generating all possible isoforms!')\n",
" d= open('del.isoforms', 'w')\n",
" \n",
" for counter, transcript in enumerate(consts['transcripts']):\n",
" if counter % 100 == 0:\n",
" print ('Isoforms:', counter)\n",
" m = isoform_map(transcript)\n",
" d.write(m)\n",
" d.write('\\n')\n",
" #print (m)\n",
" if counter > 10000:\n",
" break\n",
" print ('Testing done. Total: ', counter)\n",
" d.close()\n",
"test_isoform_map()\n"
]
},
{
"cell_type": "code",
"execution_count": 144,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"def int_commas(i):\n",
" '''\n",
" 1234 --> 1,234\n",
" 12345 --> 12,345\n",
" 123456 --> 123,456\n",
" 1234567 --> 1,234,567\n",
" ...\n",
" '''\n",
" s = str(i)\n",
" return ','.join([s[::-1][x:x+3][::-1] for x in range(0,len(s),3)][::-1])\n",
" # return ','.join([s[0 if x-3<0 else x-3:x] for x in range(len(s),0,-3)][::-1]) # Another way\n",
"\n",
"def format_pos(p):\n",
" '''\n",
" padding.\n",
" Making sure that the string reporesentation of \n",
" position takes always exactly 11 characters \n",
" The padding is equal-ish to the left and right\n",
" '''\n",
" s = int_commas(p)\n",
" padding = 11-len(s)\n",
" padding_left = padding//2\n",
" padding_right = padding - padding_left \n",
" ret = (' ' * padding_left) + s + (' ' * padding_right)\n",
" assert len(ret) == 11\n",
" return ret\n",
"\n",
"def format_exon(p):\n",
" '''\n",
" exon number\n",
" '''\n",
" if p < 10:\n",
" return str(p) + ' '\n",
" return str(p) \n",
" \n",
"def exon_box_tag(name):\n",
" d = {\n",
" '5utr': '5\\'UTR',\n",
" '3utr': '3\\'UTR',\n",
" 'utr5': '5\\'UTR',\n",
" 'utr3': '3\\'UTR',\n",
" 'cds' : ' CDS'\n",
" }\n",
" \n",
" return d[name]\n",
"\n",
"\n",
"def exon_str_1(number, first):\n",
" '''\n",
" print (exon_str_1(50))\n",
" '''\n",
" \n",
" return '''\n",
"╔══════════════╗\n",
"║ Exon {} ║\n",
"╟──────────────╢\n",
"║ {} ║\n",
"╟──────────────╢\n",
"║ ║\n",
"║ ║\n",
"'''.format(format_exon(number), exon_box_tag(first))\n",
"\n",
"\n",
"def exon_str_2(number, first, second):\n",
" '''\n",
"print (exon_str_2(4, '3utr', 'cds'))\n",
"print (exon_str_2(4, 'cds', '5utr'))\n",
"print (exon_str_2(4, 'cds', 'cds'))\n",
"print (exon_str_2(4, '3utr', '3utr'))\n",
" '''\n",
" \n",
" return '''\n",
"╔═════════════════════════════╗\n",
"║ Exon {} ║\n",
"╟──────────────┬──────────────╢\n",
"║ {} │ {} ║\n",
"╟──────────────┼──────────────╢\n",
"║ │ ║\n",
"║ │ ║\n",
"'''.format(format_exon(number), exon_box_tag(first), exon_box_tag(second))\n",
"\n",
"\n",
"def exon_str_3(number, first, second, third):\n",
" \n",
" return '''\n",
"╔════════════════════════════════════════════╗\n",
"║ Exon {} ║\n",
"╟──────────────┬──────────────┬──────────────╢\n",
"║ {} │ {} │ {} ║\n",
"╟──────────────┼──────────────┼──────────────╢\n",
"║ │ │ ║\n",
"║ │ │ ║\n",
"'''.format(format_exon(number), exon_box_tag(first), exon_box_tag(second), exon_box_tag(third))\n",
"\n",
"def exon_join():\n",
" '''\n",
" Oh christmas tree... \n",
" '''\n",
" return '''\n",
" ╱╲ \n",
" ╱ ╲ \n",
" ╱ ╲ \n",
" ╱ ╲ \n",
" ╱ ╲ \n",
" ╱ ╲ \n",
"╱ ╲\n",
"'''\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment