Skip to content

Instantly share code, notes, and snippets.

@kantale
Last active April 3, 2020 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kantale/0d11071991a6cfaa23775f564e1a27d7 to your computer and use it in GitHub Desktop.
Save kantale/0d11071991a6cfaa23775f564e1a27d7 to your computer and use it in GitHub Desktop.
Υλοποίηση του project στο μάθημα "Εισαγωγή στον προγραμματισμό με τη γλώσσα python" 2018-2019
import re
import copy
import gzip
import json
import bisect
import argparse
import requests
from collections import defaultdict, OrderedDict
from pyliftover import LiftOver
'''
Create environment
/Users/alexandroskanterakis/anaconda3/bin/conda create -n python_project python=3.7
Start environment
source /Users/alexandroskanterakis/anaconda3/bin/activate python_project
pip install requests
pip install pyliftover
Deactive
source deactivate
Remove environment
/Users/alexandroskanterakis/anaconda3/bin/conda remove --name python_project --all
'''
consts = {
'gencode': 'gencode.v29.annotation.gff3.gz',
'transcripts': {}, # Transcript data
'lo': LiftOver('hg19', 'hg38'),
'lo_inv': LiftOver('hg38', 'hg19'),
'errors': defaultdict(int),
'messages': {
'pseudoautosomal_lines' : 'pseudoautosomal genes were ignored. Number of lines found:',
'pseudoautosomal_transcripts': 'pseudoautosomal genes were ignored. Number of transcripts found:',
},
'starts': defaultdict(list),
'ends': defaultdict(list),
'transcripts_sorted': defaultdict(list),
}
def lift(chromosome, position, inverse=False):
if type(chromosome) is str:
if not 'chr' in chromosome:
chromosome = 'chr' + chromosome
elif type(chromosome) is int:
chromosome = 'chr' + str(chromosome)
else:
raise Exception('Unknown type of parameter chromosome: {}'.format(type(chromosome).__name__))
if inverse:
s = consts['lo_inv'].convert_coordinate(chromosome, position)
else:
s = consts['lo'].convert_coordinate(chromosome, position)
if len(s) > 1:
print ('WARNING: liftover returned more than one result. Taking the first..')
if len(s) == 0:
raise Exception('Liftover failed..')
return s[0][1]
def parse_gff3_line(l):
ls = l.split()
#Read chromsome
chromosome = ls[0]
# Make sure that chromosome is 'chr\d+'
assert re.match(r'chr[\dXYMT]+', chromosome)
#Annotation source
source= ls[1]
#Make sure that this is either 'HAVANA' or 'ENSEMBL'
assert source in ['HAVANA', 'ENSEMBL']
#Type of the regulatory element (gene, transcript, exon, ...)
t_type = ls[2]
if t_type in ['stop_codon_redefined_as_selenocysteine']:
return None
#Read start and end
start = int(ls[3])
end = int(ls[4])
#Read annotation
#For example: ID=exon:ENST00000450305.2:1;Parent=ENST00000450305.2;gene_id=ENSG00000223972.5 ..
annotation = {key:value for key,value in [x.split('=') for x in ls[8].split(';')]}
return {
'chromosome': chromosome,
'source': source,
'type': t_type,
'start': start,
'end': end,
'annotation': annotation
}
def decode_appris(appris):
'''
Return a integer representation of appris
The lower the better.
'''
return {
'appris_principal_1': 0,
'appris_principal_2': 1,
'appris_principal_3': 2,
'appris_principal_4': 3,
'appris_principal_5': 4,
'appris_alternative_1':5,
'appris_alternative_2':6,
}[appris]
def take_best_appris(transcripts):
'''
From a set of transcripts the one with the lowest apris code
'''
s = sorted(transcripts, key=lambda x: consts['transcripts'][x]['appris'])
return s[0] # FIX ME !!! MIGHT BE MORE THAN LOWEST!!!!
def decode_chromosome(chromosome):
'''
Return an int representation of a chromosome
'''
m = re.match(r'chr([\dXYMT]+)', chromosome)
if m:
return m.group(1)
raise Exception('Cannot decode chromosome: {}'.format(chromosome))
def store_transcript_data(gff3_data):
chromosome = decode_chromosome(gff3_data['chromosome'])
annotations = gff3_data['annotation']
transcript_id = annotations['transcript_id']
gene = annotations['gene_name']
t_type = gff3_data['type']
if '_PAR_' in annotations['ID']:
# These are pseudoautosomal genes
# They have the same transcript id and exist in both X and Y chromosomes..
consts['errors']['pseudoautosomal_lines'] += 1
if t_type == 'transcript':
consts['errors']['pseudoautosomal_transcripts'] += 1
return
start = gff3_data['start']
end = gff3_data['end']
exon_number = int(annotations.get('exon_number', -1))
#Check that exon number makes sense...
if t_type in ['CDS', 'exon', 'start_codon', 'stop_codon', 'three_prime_UTR', 'five_prime_UTR']:
assert exon_number>0
location_tuple = (start, end, exon_number)
#Read appris information
appris = [x for x in annotations['tag'].split(',') if 'appris' in x]
assert len(appris) == 1 # Cannot have more than one appries..
appris = appris[0]
#print (transcript_id)
transcripts = consts['transcripts']
if not transcript_id in transcripts:
transcripts[transcript_id] = {
'gene': gene,
'chr': chromosome,
'start': None,
'end': None,
'exons': [],
'cds': [],
'5utr': [],
'3utr': [],
'start_c': [], # Start codon
'stop_c': [], # Stop codon
'appris': None,
}
transcript_info = transcripts[transcript_id]
#Some quality control
assert transcript_info['gene'] == gene
#Store transcript information in relevant fields
if t_type == 'transcript':
assert transcript_info['appris'] is None
transcript_info['appris'] = decode_appris(appris)
assert transcript_info['start'] is None
assert transcript_info['end'] is None
transcript_info['start'] = start
transcript_info['end'] = end
elif t_type == 'exon':
transcript_info['exons'].append(location_tuple)
elif t_type == 'CDS':
transcript_info['cds'].append(location_tuple)
elif t_type == 'start_codon':
#assert transcript_info['start_c'] is None # Cannot have two start codons.. Yet.. some.. have
transcript_info['start_c'].append(location_tuple)
elif t_type == 'stop_codon':
#assert transcript_info['stop_c'] is None # Cannot have two stop codons..
transcript_info['stop_c'].append(location_tuple)
elif t_type == 'five_prime_UTR':
transcript_info['5utr'].append(location_tuple)
elif t_type == 'three_prime_UTR':
transcript_info['3utr'].append(location_tuple)
else:
raise Exception('Unknown regulatory element: {}'.format(t_type))
def parse_gff3(fn):
'''
Parse gff3 file
'''
print ('Start parsing:', fn)
with gzip.open(fn, 'rt') as f:
line_counter = 0
for l in f:
line_counter += 1
if line_counter % 100000 == 0:
print ('Lines: {0} {1:.2f}%'.format(line_counter, 100*line_counter/2739404))
if l[0] == '#': # This is a comment
continue
if not 'appris' in l: # Ignore transcripts without appris information
continue
gff3_data = parse_gff3_line(l)
if not gff3_data:
continue
store_transcript_data(gff3_data)
#if line_counter > 10000:
# break
#if gff3_data['chromosome'] != 'chr1':
# break
print ('Done parsing:', fn)
def int_commas(i):
'''
1234 --> 1,234
12345 --> 12,345
123456 --> 123,456
1234567 --> 1,234,567
...
'''
s = str(i)
return ','.join([s[::-1][x:x+3][::-1] for x in range(0,len(s),3)][::-1])
# return ','.join([s[0 if x-3<0 else x-3:x] for x in range(len(s),0,-3)][::-1]) # Another way
def format_pos(p):
'''
padding.
Making sure that the string reporesentation of
position takes always exactly 11 characters
The padding is equal-ish to the left and right
'''
s = int_commas(p)
padding = 11-len(s)
padding_left = padding//2
padding_right = padding - padding_left
ret = (' ' * padding_left) + s + (' ' * padding_right)
assert len(ret) == 11
return ret
def format_exon(p):
'''
exon number
'''
if p < 10:
return str(p) + ' '
return str(p)
def exon_box_tag(name):
d = {
'5utr': '5\'UTR',
'3utr': '3\'UTR',
'utr5': '5\'UTR',
'utr3': '3\'UTR',
'cds' : ' CDS'
}
return d[name]
def exon_str_1(number, first):
'''
print (exon_str_1(50))
'''
return '''
╔══════════════╗
║ Exon {} ║
╟──────────────╢
║ {} ║
╟──────────────╢
║ ║
║ ║
'''.format(format_exon(number), exon_box_tag(first))
def exon_str_2(number, first, second):
'''
print (exon_str_2(4, '3utr', 'cds'))
print (exon_str_2(4, 'cds', '5utr'))
print (exon_str_2(4, 'cds', 'cds'))
print (exon_str_2(4, '3utr', '3utr'))
'''
return '''
╔═════════════════════════════╗
║ Exon {} ║
╟──────────────┬──────────────╢
║ {} │ {} ║
╟──────────────┼──────────────╢
║ │ ║
║ │ ║
'''.format(format_exon(number), exon_box_tag(first), exon_box_tag(second))
def exon_str_3(number, first, second, third):
return '''
╔════════════════════════════════════════════╗
║ Exon {} ║
╟──────────────┬──────────────┬──────────────╢
║ {} │ {} │ {} ║
╟──────────────┼──────────────┼──────────────╢
║ │ │ ║
║ │ │ ║
'''.format(format_exon(number), exon_box_tag(first), exon_box_tag(second), exon_box_tag(third))
def exon_join():
'''
Oh christmas tree...
'''
return '''
╱╲
╱ ╲
╱ ╲
╱ ╲
╱ ╲
╱ ╲
╱ ╲
'''
def space_check(space_1, space_2):
'''
Returns True if space1 is BEFORE space_2
Returns False is space1 is AFTER space_2
Throws excpetion in case of overlap
'''
if space_1[0] <= space_1[1] <= space_2[0] <= space_2[1]:
return True
if space_2[0] <= space_2[0] and space_1[0] <= space_1[1]:
return False
raise Exception('Space overlap: {} {}'.format(space_1, space_2))
def exon_map(exon, cds, utr_3, utr_5):
'''
Return a box for each exon
Return the positions of the edges of the boxes
'''
exon_number = exon[2]
if utr_3 is None and utr_5 is None:
assert exon == cds # If not utrs present exon and cds should coincide
return exon_str_1(exon_number, 'cds') , (exon[0], exon[1])
elif cds is None and utr_5 is None:
assert exon == utr_3
return exon_str_1(exon_number, 'utr3'), (exon[0], exon[1])
elif cds is None and utr_3 is None:
assert exon == utr_5
return exon_str_1(exon_number, 'utr5'), (exon[0], exon[1])
elif utr_3 is None:
assert utr_5 # Redundant but ok..
#Which is first, CDS or UTR5?
s_check = space_check(cds, utr_5)
if s_check:
assert exon[0] == cds[0]
assert cds[1] == utr_5[0] - 1
assert exon[1] == utr_5[1]
return exon_str_2(exon_number, 'cds', 'utr5'), (exon[0], cds[1], exon[1])
else:
assert exon[0] == utr_5[0]
assert utr_5[1] == cds[0] - 1
assert exon[1] == cds[1]
return exon_str_2(exon_number, 'utr5', 'cds'), (exon[0], utr_5[1], exon[1])
elif utr_5 is None:
assert utr_3 # Redundancy is bliss..
s_check = space_check(cds, utr_3)
if s_check:
assert exon[0] == cds[0]
assert cds[1] == utr_3[0] - 1
assert exon[1] == utr_3[1]
return exon_str_2(exon_number, 'cds', 'utr3'), (exon[0], cds[0], exon[1])
else:
assert exon[0] == utr_3[0]
assert utr_3[1] == cds[0] - 1
assert exon[1] == cds[1]
return exon_str_2(exon_number, 'utr3', 'cds'), (exon[0], utr_3[1], exon[1])
else:
assert cds
assert utr_3
assert utr_5
# 69055, 70108, 1) (69091, 70008, 1) (70009, 70108, 1) (69055, 69090, 1)
# Which is first, utr_3 or utr_5 ?
s_check = space_check(utr_3, utr_5)
if s_check:
assert utr_3[0] <= utr_3[1] <= cds[0] <= cds[1] <= utr_5[0] <= utr_5[1]
return exon_str_3(exon_number, 'utr3', 'cds', 'utr5'), (exon[0], cds[0], utr_5[0], exon[1])
else:
assert utr_5[0] <= utr_5[1] <= cds[0] <= cds[1] <= utr_3[0] <= utr_3[1]
return exon_str_3(exon_number, 'utr5', 'cds', 'utr3'), (exon[0], cds[0], utr_3[0], exon[1])
raise Exception('This should never happen..')
def isoform_map(transcript, position=None):
'''
position: position to Highligh!
'''
tr_data = consts['transcripts'][transcript]
def element_getter(element, exon_number):
ret = [x for x in tr_data[element] if x[2]==exon_number]
if len(ret) > 1:
raise Exception('Transcript: {} exon_number: {} has more than one: {}'.
format(transcript, exon_number, element))
if ret:
return ret[0]
return None # Do not have do.. but always nice and readable
#Order exons accorsing to position
exons_ordered = sorted(tr_data['exons'])
boxes = []
positions = []
for exon in exons_ordered:
exon_number = exon[2]
#Get all elements of the exon
cds = element_getter('cds', exon_number)
utr_3 = element_getter('3utr', exon_number)
utr_5 = element_getter('5utr', exon_number)
if boxes:
boxes.append(exon_join())
exon_box, exon_positions = exon_map(exon, cds, utr_3, utr_5)
boxes.append(exon_box)
positions.append(exon_positions)
boxes = [x.split('\n')[1:] for x in boxes] # Remove first line
assert len({len(x) for x in boxes}) <= 1 # Each "box" should have the same number of lines
boxes_str = '\n'.join([' '+''.join(x) for x in zip(*boxes)]) # "Zip" boxes together
# print (boxes_str)
boxes_str = boxes_str[:-5] # Remove enter at the end of boxes
all_positions = [y for x in positions for y in x] # Flatten positions
positions_str = '' + ' '.join([format_pos(x) for x in all_positions])
# print (positions_str)
complete_str = boxes_str + positions_str
if position:
position_index = bisect.bisect(all_positions, position)
#print ('bisect:', position_index)
if all_positions[position_index-1] == position:
position_line = ' ' *(15*(position_index-1)) + format_pos(position)
else:
position_line = ' ' *(15*position_index-7) + format_pos(position)
complete_str = complete_str + '\n' + position_line
return complete_str
#print (tr_data)
#isoform_map('ENST00000433179.3')
def test_isoform_map():
'''
For all transcripts check isoform maps
'''
print ('Generating all possible isoforms!')
d= open('del.isoforms', 'w')
for counter, transcript in enumerate(consts['transcripts']):
if counter % 100 == 0:
print ('Isoforms:', counter)
m = isoform_map(transcript)
d.write(m)
d.write('\n')
#print (m)
if counter > 10000:
break
print ('Testing done. Total: ', counter)
d.close()
def test_isoform_map_2():
print (isoform_map('ENST00000332831.4'))
print (isoform_map('ENST00000332831.4', 685700))
print (isoform_map('ENST00000332831.4', 686000))
print (isoform_map('ENST00000332831.4', 686660))
print (isoform_map('ENST00000332831.4', 685679))
print (isoform_map('ENST00000332831.4', 685716))
print (isoform_map('ENST00000332831.4', 686673))
def search_intervals(starts, ends, position):
'''
Assumes intervals of numbers.
zip(starts, ends)
which indexes of these contain the position?
'''
assert len(starts) == len(ends)
if position < starts[0]:
return () ## Too small position
if position > ends[-1]:
return ()
start_position = bisect.bisect_right(starts, position)
# We do know that all position right from start_position are not suitable
#print (start_position)
if start_position == 0:
return () ## Too small position
if start_position == len(starts):
start_position -= 1
ret = []
offset = 0
while True:
current_pos = start_position-offset
if current_pos < 0:
break
current_start = starts[current_pos]
current_end = ends[current_pos]
if current_start <= position <= current_end:
ret.append(current_pos)
else:
if offset>=1:
break
offset += 1
return ret
def test_search_intervals():
a = [20,50,80]
b = [40,60,100]
print (a)
print (b)
print (search_intervals(a,b,55), '1')
print (search_intervals(a,b,30), '0')
print (search_intervals(a,b,85), '2')
print (search_intervals(a,b,15), '<empty>')
print (search_intervals(a,b,110), '<empty>')
print (search_intervals(a,b,20), '0')
print (search_intervals(a,b,40), '0')
print (search_intervals(a,b,50), '1')
print (search_intervals(a,b,60), '1')
print (search_intervals(a,b,80), '2')
print (search_intervals(a,b,100), '2')
print (search_intervals(a,b,45), '<empty>', 45)
print (search_intervals(a,b,70), '<empty>', 70)
a = [20,50,80]
b = [60,60,100]
print (a)
print (b)
print (search_intervals(a,b,55), '1,0')
print (search_intervals(a,b,60), '1,0')
print (search_intervals(a,b,50), '1,0')
print (search_intervals(a,b,40), '0')
print (search_intervals(a,b,20), '0')
print (search_intervals(a,b,19), '<empty>')
print (search_intervals(a,b,85), '2')
print (search_intervals(a,b,80), '2')
print (search_intervals(a,b,100), '2')
print (search_intervals(a,b,105), '<empty>')
a = [20,50,80]
b = [100,100,100]
print (a)
print (b)
print (search_intervals(a,b,90), '2,1,0')
print (search_intervals(a,b,30), '0')
print (search_intervals(a,b,60), '0,1')
def search_empty_intervals(starts, ends, position):
'''
Get empty intervals
'''
start_position = bisect.bisect(starts, position)
if start_position == 0:
return (None, 0)
if start_position == len(starts):
return (start_position-1, None)
return (start_position-1, start_position)
def index_transcripts():
print ('Indexing transcripts..')
transcripts = consts['transcripts']
sorted_positions = sorted((v['chr'], v['start'], v['end'], k) for k, v in transcripts.items())
#consts['all_chromosomes'] = {v['chr'] for k,v in transripts.item()}
for chromosome, start, end, transcript in sorted_positions:
consts['starts'][chromosome].append(start)
consts['ends'][chromosome].append(end)
consts['transcripts_sorted'][chromosome].append(transcript)
print (' Indexing transcripts DONE.')
def index_genes():
print ('Indexing genes..')
transcripts = consts['transcripts']
# Create a dictionary. Key = gene, values = list of transcripts
genes = defaultdict(list)
_ = [genes[v['gene']].append(k) for k,v in transcripts.items()]
genes = {k:take_best_appris(v) for k,v in genes.items()}
consts['genes'] = genes
# Bug: Previous of first gene in chromosome 2 is the last of chromosome 1..
consts['genes_ordered'] = OrderedDict(sorted(((k, (transcripts[v]['chr'], transcripts[v]['start'], transcripts[v]['end'])) for k,v in genes.items()), key=lambda x: x[1]))
consts['genes_ordered_keys'] = list(consts['genes_ordered'].keys())
consts['genes_ordered_values'] = list(consts['genes_ordered'].values())
print (' Indexing genes DONE.')
def search_transcripts(chromosome, position):
'''
Get all the transcripts with that position
print(search_transcripts(686655)) # (True, ['ENST00000332831.4'])
print(search_transcripts(500000)) # (False, ['ENST00000426406.3', 'ENST00000332831.4'])
print(search_transcripts(50000)) # (False, [None, 'ENST00000335137.4'])
'''
found_intervals = search_intervals(consts['starts'][chromosome], consts['ends'][chromosome], position)
if found_intervals:
return True, [consts['transcripts_sorted'][chromosome][x] for x in found_intervals]
intervals = search_empty_intervals(consts['starts'][chromosome], consts['ends'][chromosome], position)
return False, [consts['transcripts_sorted'][chromosome][x] if x else None for x in intervals]
#======================QUESTIONS============================
def configure_regexp():
refseq = r'NM_[\d]+\.[\d]+|NM_[\d]+'
ensembl = r'ENST[\d]+\.[\d]+|ENST[\d]+'
chromosome = r'1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|x|y|X|Y'
transcript = r'(?P<refseq>{})|(?P<ensembl>{})|(?P<gene>[\w]+)|(?P<hgvs_chromosome>{})'.format(refseq, ensembl, chromosome)
chromosome_decl = r'chromosome|chr'
split_chr_position = r':|,|and[\s]+position|and'
rs = r'rs[\d]+'
mutation_change = r'[ACGT]>[ACGT]'
chr_mutation = r'({})?[\s]*(?P<chromosome>{})[\s]*({})[\s]*(?P<position>[\d]+)[\s]*(?P<change>{})?'.format(chromosome_decl, chromosome, split_chr_position, mutation_change) # chr 1 : 234234234
hgvs_mutation = r'({}):[cg]\.[\d]+({})'.format(transcript, mutation_change)
mutation=r'(?P<rs>{})|(?P<hgvs_mutation>{})|(?P<chr_mutation>{})'.format(rs, hgvs_mutation, chr_mutation)
#print (mutation)
mut_reg = re.compile(mutation)
consts['mut_regexp'] = mut_reg
if False: # To test
m = mut_reg.search('rs12345')
print(m.groupdict())
m = mut_reg.search('1:123')
print(m.groupdict())
m = mut_reg.search('chr1:123')
print(m.groupdict())
m = mut_reg.search('chromosome 1:123')
print(m.groupdict())
m = mut_reg.search('chromosome 1 and position 123')
print(m.groupdict())
m = mut_reg.search('chromosome 1 and 123')
print(m.groupdict())
m = mut_reg.search('NM_234234:c.100A>G')
print(m.groupdict())
m = mut_reg.search('ENST00022222:c.100A>G')
print(m.groupdict())
m = mut_reg.search('2:g.100A>G')
print(m.groupdict())
m = mut_reg.search('chr2:g.100A>G')
print(m.groupdict())
def get_location_from_regexp_match(match):
'''
python project.py --question " rs58991260 " --> Chromosome: 1 Position: 218458480 . bnelongs in between transcripts
python project.py --question " rs262656 "
python project.py --question " rs780472451 " --> Coding Sequence Variant
python project.py --question " 1:2157315 "
python project.py --question " AGT:c.803T>C "
python project.py --question " NM_003193.4:c.464T>A "
python project.py --question " NM_003002.3:c.274G>T "
python project.py --question " ENST00000003084:c.1431T>G "
'''
#print (match)
if match['rs']:
location, change = myvariant(match['rs'], task='fetch_location_of_dbsnp')
return location, change
if match['chr_mutation']:
location = {'chromosome': match['chromosome'], 'position': int(match['position'])}
change = match['change']
return location, chang
if match['hgvs_mutation']:
# There are three posibilities: GENE, ENSEMBL, REFSEQ
if match['gene']:
location, change = vep(match['hgvs_mutation'], task="fetch_location_from_HGVS") # AGT:c.803T>C
return location, change
elif match['refseq']:
location, change = mutalyzer(match['hgvs_mutation'], task='') # NM_003002.3:c.274G>T
return location, change
elif match['ensembl']:
location, change = vep(match['hgvs_mutation'], task="fetch_location_from_HGVS") # ENST00000003084:c.1431T>G
return location, change
else:
raise Exception('This should never happen')
else:
raise Exception('Could not extract location from match')
def answer(question):
'''
rs58991260 . in chromosome 1
'''
genes_ordered_values = consts['genes_ordered_values']
genes_ordered_keys = consts['genes_ordered_keys']
transcripts = consts['transcripts']
print ('The question asked is:')
print ('--> ' + question)
to_print = []
gene = None
transcript = None
primary_transcript = None
location = None
change = None
m = re.search(r'gene[\s]+([\w]+)', question)
if m:
gene = m.group(1)
gene = gene.upper()
if gene in consts['genes']:
transcript = consts['genes'][gene]
primary_transcript = transcript
else:
gene = None
#Check if there is a location in the question.
question_locations = consts['mut_regexp'].search(question)
if question_locations:
question_locations_d = question_locations.groupdict()
location, change = get_location_from_regexp_match(question_locations_d)
# Search for transcripts
found, transcripts_found = search_transcripts(**location)
if not found: # rs58991260
to_print.append('Position of : {} does not belong in any transcript'.format(question_locations.group(0)))
to_print.append('It belongs in between these transcripts')
to_print.append(' '.join(transcripts_found))
else: #rs262656
to_print.append('Found the following transcripts:')
for x in transcripts_found:
to_print.append(' {} --> Gene:{} appris: {}'.format(x, transcripts[x]['gene'], transcripts[x]['appris']))
primary_transcript = take_best_appris(transcripts_found)
gene = [transcripts[x]['gene'] for x in transcripts_found if x==primary_transcript][0]
transcript = primary_transcript
if location:
to_print.append('Chromosome: {chromosome} Position: {position}'.format(**location))
# Check # myvariant(location={'chromosome':9, 'position':107620835 }, task="fetch_variants_from_location")
# Get all HGVS and rs from this location
myvariant_variants = myvariant(location=copy.copy(location), task="fetch_variants_from_location")
to_print.append('Found variants on this location:')
for x in myvariant_variants:
to_print.append(' {variant} --> {rs}'.format(**x))
position_from = max(0, int(location['position'])-50)
position_to = int(location['position']) + 50
sequence = ensembl_region(location['chromosome'], position_from, position_to)
to_print.append('Reference Sequence: {} FROM:{}(-50) TO: {}(+50)'.format(location['chromosome'], position_from, position_to))
to_print.append(sequence)
if location and change:
to_print.append(myvariant(variant={
'chromosome':location['chromosome'],
'position': location['position'],
'change':change}, task="fetch_info_from_variant"))
if gene:
to_print.append('Gene: {}'.format(gene))
gene_info = mygene(gene, 'get_info')
to_print.append('Gene name: {}'.format(gene_info['name']))
to_print.append('Type of gene: {}'.format(gene_info['type_of_gene']))
to_print.append('Other Gene names:')
to_print.append(gene_info['other_names'])
to_print.append('Summary:')
to_print.append(gene_info['summary'])
to_print.append('Gene location information:')
to_print.append(gene_info['location'])
to_print.append('Refseq Translation:')
to_print.append(gene_info['refseq'])
to_print.append('Pathways:')
to_print.append(gene_info['pathway'])
to_print.append('This gene also exists on the following organisms:')
to_print.append(gene_info['taxonomy'])
if primary_transcript:
to_print.append('Primary Transcript: {}'.format(primary_transcript))
#if any((x in question for x in ['upstream', 'downstream', 'closer', 'closest'])) and gene:
if gene:
location_index = consts['genes_ordered_keys'].index(gene)
decorate_with_stars = lambda x,right : '*' + x + '*' if x==right else x
docorate_with_positions = lambda i : int_commas(genes_ordered_values[i][1]) + ' ' + decorate_with_stars(genes_ordered_keys[i], gene) + ' ' + int_commas(genes_ordered_values[i][2])
f = max(0, location_index-3)
t = min(len(consts['genes']), location_index+3)
to_show = [docorate_with_positions(i) for i in range(f,t+1)]
to_print.append('Downstream / Upstream genes:')
to_print.append((' '*4) + ' --> '.join(to_show))
if 'transcripts' in question and gene:
to_print.append('Transcripts of gene: {}'.format(gene))
tr = [k for k,v in transcripts.items() if v['gene'] == gene]
for i, x in enumerate(tr):
if x == primary_transcript:
to_print.append('{} {} *PRIMARY*'.format(i+1, x))
else:
to_print.append('{} {}'.format(i+1, x))
if transcript:
to_print.append('Isoform map of {}:'.format(transcript))
to_print.append(isoform_map(transcript, position = location['position'] if location else None))
print('\n'.join(to_print))
#==================================================
# MyVariant.info
# Example of result from variant.infgo from rs58991260
'''
{
"max_score": 15.177358,
"took": 3,
"total": 1,
"hits": [
{
"_id": "chr1:g.218631822G>A",
"_score": 15.177358,
"dbsnp": {
"_license": "https://goo.gl/Ztr5rl",
"allele_origin": "unspecified",
"alleles": [
{
"allele": "G",
"freq": 0.9784
},
{
"allele": "A",
"freq": 0.02157
}
],
"alt": "A",
"chrom": "1",
"class": "SNV",
"dbsnp_build": 129,
"flags": [
"ASP",
"G5",
"GNO",
"KGPhase1",
"KGPhase3",
"SLO"
],
"gmaf": 0.02157,
"hg19": {
"end": 218631822,
"start": 218631822
},
"ref": "G",
"rsid": "rs58991260",
"validated": false,
"var_subtype": "ts",
"vartype": "snp"
}
}
]
}
'''
def json_pprint(j):
print (json.dumps(j, indent=4))
def myvariant(variant=None, location=None, task=None):
'''
variant can be rs..
http://myvariant.info/v1/variant/chr9:g.107620835G>A
http://myvariant.info/v1/variant/chr9:g.107620835G>A?fields=dbsnp.rsid,dbsnp.vartype
http://myvariant.info/v1/query?q=chr9:107620835&fields=gnomad_genome.af,exac.af,dbnsfp.1000gp3.af
http://myvariant.info/v1/query?q=rs58991260
http://myvariant.info/v1/query?q=chr9:107620835&fields=_id,dbsnp.rsid
myvariant(location={'chromosome':9, 'position':107620835 }, task="fetch_variants_from_location")
myvariant(variant={'chromosome':19, 'position': 44908684, 'change':'T>C'}, task="fetch_info_from_variant")
'''
def lift_hgvs(hgvs):
s = re.search(r'chr([\d+XY]+):g\.([\d]+)([ACGT]>[ACGT])', hgvs)
p = lift(chromosome=s.group(1), position=int(s.group(2)))
return 'chr{}:g.{}{}'.format(s.group(1), p, s.group(3))
if task == 'fetch_location_of_dbsnp':
url = 'http://myvariant.info/v1/query'
parameters = {
'q': variant,
'fields': 'dbsnp',
#'hg38': 'true', # Makes no differnce
}
elif task == 'fetch_info_from_variant':
variant['position'] = lift(variant['chromosome'], variant['position'], inverse=True)
this_variant = 'chr{chromosome}:g.{position}{change}'.format(**variant)
url = f'http://myvariant.info/v1/variant/{this_variant}'
parameters = {
'fields': 'gnomad_genome.af,exac.af,dbnsfp.1000gp3.af,clinvar,snpeff.ann,cadd.polyphen,cadd.sift.cat'
}
elif task == 'fetch_variants_from_location':
url = 'http://myvariant.info/v1/query'
location['position'] = lift(location['chromosome'], location['position'], inverse=True)
parameters = {
'q': 'chr{chromosome}:{position}'.format(**location),
'fields': '_id,dbsnp.rsid',
#'hg38': 'true', # No effect ??
}
else:
raise Exception('Unknown task: {}'.format(task))
r = requests.get(url, params=parameters)
if not r.ok:
r.raise_for_status()
j = r.json()
#json_pprint(j)
if task == 'fetch_location_of_dbsnp':
chromosome = j['hits'][0]["dbsnp"]["chrom"]
position_19 = j['hits'][0]["dbsnp"]["hg19"]["start"]
position_38 = lift(chromosome, position_19)
change = '{}>{}'.format(j['hits'][0]['dbsnp']['ref'], j['hits'][0]['dbsnp']['alt'])
return {'chromosome': chromosome, 'position': position_38}, change
elif task == 'fetch_info_from_variant':
ret = []
if 'dbnsfp' in j:
if '1000gp3' in j['dbnsfp']:
if 'af' in j['dbnsfp']['1000gp3']:
ret.append('1000 Genomes Allele Frequency: {}'.format(j['dbnsfp']['1000gp3']['af']))
if 'exac' in j:
if 'af' in j['exac']:
ret.append('ExAC Allele Frequency: {}'.format(j['exac']['af']))
if 'gnomad_genome' in j:
if 'af' in j['gnomad_genome']['af']:
if 'af' in j['gnomad_genome']['af']:
ret.append('gnomAD Allele Frequency: {}'.format(j['gnomad_genome']['af']['af']))
if 'clinvar' in j:
if 'rcv' in j['clinvar']:
ret.append('Clinvar:')
ret.extend([' {} --> {} --> {} '.format(x['clinical_significance'], x['conditions']['name'], x['review_status']) for x in j['clinvar']['rcv']])
if 'snpeff' in j:
if 'ann' in j['snpeff']:
ret.append('snpeff:')
if type(j['snpeff']['ann']) is list:
#json_pprint(j['snpeff']['ann'])
ret.extend([' {}:{} --> {}'.format(x['feature_id'], x['hgvs_c'], x['effect']) for x in j['snpeff']['ann']])
else:
if 'effect' in j['snpeff']['ann']:
ret.append(' {}:{} --> {}'.format(j['snpeff']['ann']['feature_id'], j['snpeff']['ann']['hgvs_c'], j['snpeff']['ann']['effect']))
if 'cadd' in j:
if 'polyphen' in j['cadd']:
if 'cat' in j['cadd']['polyphen']:
ret.append('polyphen category: {}'.format(j['cadd']['polyphen']['cat']))
if 'sift' in j['cadd']:
if 'cat' in j['cadd']['sift']:
ret.append('SIFT category: {}'.format(j['cadd']['sift']['cat']))
ret = '\n'.join(ret)
return ret
elif task == 'fetch_variants_from_location':
# http://myvariant.info/v1/query?q=chr9:107620835&fields=_id,dbsnp.rsid
return [{'variant': lift_hgvs(x['_id']), 'rs': x.get('dbsnp', {'rsid': None})['rsid']} for x in j['hits']]
else:
raise Exception('Unknown task: {}'.format(task))
#================End of MyVariant.info #############
#==================================================
#==============VEP =====================
'''
[
{
"id": "AGT:c.803T>C",
"seq_region_name": "1",
"strand": -1,
"colocated_variants": [
{
"start": "230710048",
"strand": "1",
"seq_region_name": "1",
"end": "230710048",
"allele_string": "HGMD_MUTATION",
"phenotype_or_disease": "1",
"id": "CM920010"
},
{
"strand": "1",
"seq_region_name": "1",
"start": "230710048",
"minor_allele": "A",
"phenotype_or_disease": "1",
"minor_allele_freq": "0.2949",
"frequencies": {
"C": {
"gnomad_sas": "0.6204",
"eas": "0.8532",
"amr": "0.6354",
"gnomad_nfe": "0.4202",
"gnomad_asj": "0.4391",
"gnomad_oth": "0.5164",
"aa": "0.8268",
"gnomad_amr": "0.7185",
"gnomad_fin": "0.4394",
"afr": "0.9032",
"ea": "0.4258",
"sas": "0.636",
"gnomad": "0.5464",
"gnomad_afr": "0.8485",
"gnomad_eas": "0.8378",
"eur": "0.4115"
}
},
"clin_sig": [
"benign",
"risk_factor"
],
"id": "rs699",
"end": "230710048",
"pubmed": [
"21988197",
"9831339",
"20486282",
"21681796"
],
"allele_string": "A/G"
}
],
"most_severe_consequence": "missense_variant",
"input": "AGT:c.803T>C",
"allele_string": "T/C",
"transcript_consequences": [
{
"cds_start": 803,
"polyphen_prediction": "benign",
"sift_prediction": "tolerated",
"sift_score": 1,
"gene_symbol": "AGT",
"biotype": "protein_coding",
"gene_id": "ENSG00000135744",
"protein_end": 268,
"cdna_start": 1018,
"polyphen_score": 0,
"hgnc_id": "HGNC:333",
"amino_acids": "M/T",
"consequence_terms": [
"missense_variant"
],
"codons": "aTg/aCg",
"protein_start": 268,
"variant_allele": "C",
"cds_end": 803,
"impact": "MODERATE",
"strand": -1,
"transcript_id": "ENST00000366667",
"gene_symbol_source": "HGNC",
"cdna_end": 1018
},
{
"variant_allele": "C",
"transcript_id": "ENST00000412344",
"gene_id": "ENSG00000244137",
"strand": -1,
"impact": "MODIFIER",
"distance": 650,
"gene_symbol_source": "Clone_based_ensembl_gene",
"gene_symbol": "AL512328.1",
"consequence_terms": [
"downstream_gene_variant"
],
"biotype": "antisense"
}
],
"end": 230710048,
"assembly_name": "GRCh38",
"start": 230710048
}
]
'''
def vep(variant, task):
'''
https://rest.ensembl.org/documentation/info/vep_hgvs_get
AGT:c.803T>C
'''
server = "https://rest.ensembl.org"
ext = "/vep/human/hgvs/{}?".format(variant)
parameters = {
"content-type": "application/json"
}
r = requests.get(server+ext, headers={ "Content-Type" : "application/json"}, params=parameters)
if not r.ok:
r.raise_for_status()
decoded = r.json()
#json_pprint(decoded)
chromosome = decoded[0]['seq_region_name']
position = int(decoded[0]['start'])
change = decoded[0]['allele_string']
if task == 'fetch_location_from_HGVS':
location = {'chromosome': chromosome, 'position': position}
return location, change
else:
raise Exception('Uknown task: {}'.format(task))
#==========END OF VEP ==================
### ensembl region ###
def ensembl_region(chromosome, f, t):
'''
ensembl_region('X', 1000000, 1000100)
https://rest.ensembl.org/sequence/region/human/X:1000000..1000100:1?content-type=text/plain
'''
if t<=f:
raise Exception('from should be greater than to')
url = f'https://rest.ensembl.org/sequence/region/human/{chromosome}:{f}..{t}:1'
parameters = {
'content-type' : 'text/plain',
}
r = requests.get(url, params=parameters)
if not r.ok:
r.raise_for_status()
t = r.text
return t
### End of ensebl region
########### mutalyzer ##############
def mutalyzer(variant, task):
'''
https://mutalyzer.nl/soap-api
https://mutalyzer.nl/json/runMutalyzer?variant=NM_003193.4:c.464T%3EA
NM_003193.4:c.464T>A
NM_003002.3:c.274G>T
https://mutalyzer.nl/json/numberConversion?variant=NM_003002.3:c.274G%3ET&build=hg38
'''
url='https://mutalyzer.nl/json/numberConversion'
parameters = {
"variant" : variant,
"build" : "hg38",
}
r = requests.get(url, params=parameters)
if not r.ok:
r.raise_for_status()
decoded = r.json() # NC_000011.10:g.112088971G>T
m = re.search(r'NC_([\d]+)\.[\d]+:g\.([\d]+)([ACGT]>[ACGT])', decoded[0])
chromosome = int(m.group(1))
position = int(m.group(2))
change = m.group(3)
location = {'chromosome': chromosome, 'position': position}
return location, change
########### end of mutalyzer ########
############ mygene.info #######################
def mygene_taxonomy(gene):
tax_names = {
9606 : ('human', 'Homo sapiens'),
10090 : ('mouse', 'Mus musculus'),
10116 : ('rat', 'Rattus norvegicus'),
7227 : ('fruitfly', 'Drosophila melanogaster'),
6239 : ('nematode', 'Caenorhabditis elegans'),
7955 : ('zebrafish', 'Danio rerio'),
3702 : ('thale-cress', 'Arabidopsis thaliana'),
8364 : ('frog', 'Xenopus tropicalis'),
9823 : ('pig', 'Sus scrofa'),
}
url = 'http://mygene.info/v3/query'
parameters = {
'q': "symbol:{}".format(gene),
'fields': 'taxid'
}
r = requests.get(url, params = parameters)
if not r.ok:
raise r.raise_for_status()
decoded = r.json()
#json_pprint(decoded)
tax = list(filter(bool, [tax_names.get(int(x['taxid']), False) for x in decoded['hits']]))
if tax:
ret = '\n'.join([' {} --> {}'.format(*x) for x in tax])
return ret
return ''
def mygene(gene, task):
'''
By default is in hg38
All fields
http://docs.mygene.info/en/latest/doc/data.html
http://mygene.info/v3/query?q=symbol:tpmt&fields=name&species=human
Test with: mygene('TPMT', 'get_names')
'''
def is_mygene_chromosome(x):
'''
Exclude things like: "chr": "CHR_HSCHR1_5_CTG32_1"
'''
return x.upper() in list(map(str, range(1,23))) + ['X', 'Y']
url = 'http://mygene.info/v3/query'
if task == 'get_info':
fields = 'name,other_names,map_location,genomic_pos,refseq,pathway,summary,type_of_gene'
else:
raise Exception('Unknown task for mygene: {}'.format(task))
parameters = {
'q': gene,
'species' : 'human',
'fields': fields,
#'hg38': 'true',
}
r = requests.get(url, params=parameters)
if not r.ok:
r.raise_for_status()
decoded = r.json()
#json_pprint(decoded)
first_hit = decoded['hits'][0]
if type(first_hit['genomic_pos']) is list:
chromosomes = {x['chr'] for x in first_hit['genomic_pos'] if is_mygene_chromosome(x['chr'])}
assert len(chromosomes) == 1 # Can it have more than in chromosomes?
chromosome = list(chromosomes)[0]
location = '\n'.join(["Ensemble Gene: {ensemblgene} Chromosome: {chr} Start: {start} End:{end}".format(**x) for x in first_hit['genomic_pos'] if x['chr'] == chromosome])
else:
chromosome = first_hit['genomic_pos']['chr']
location = "Ensemble Gene: {ensemblgene} Chromosome: {chr} Start: {start} End:{end}".format(**first_hit['genomic_pos'])
# Create refseq
refseq = '\n'.join([" {rna} --> {protein} ".format(**x) for x in first_hit['refseq']['translation']])
# Pathway
pathway = ''
for pathway_db in ['kegg', 'pharmgkb', 'reactome', 'smpdb', 'wikipathways']:
if pathway_db in first_hit['pathway']:
#print (pathway_db, first_hit['pathway'][pathway_db])
pathway += pathway_db + '\n'
if type(first_hit['pathway'][pathway_db]) is list:
pathway += '\n'.join([" {id} --> {name}".format(**x) for x in first_hit['pathway'][pathway_db]])
else:
pathway += " {id} --> {name}".format(**first_hit['pathway'][pathway_db])
ret = {
'name' : first_hit['name'],
'other_names' : '\n'.join(map(lambda x: ' ' + x, first_hit['other_names'])),
'map_location': first_hit['map_location'],
'location' : location,
'refseq' : refseq,
'pathway': pathway,
'summary': first_hit['summary'],
'taxonomy': mygene_taxonomy(gene),
'type_of_gene': first_hit['type_of_gene']
}
return ret
############ end of mygene.info ################
def report_errors():
if not consts['errors']:
return
print ('Errors:')
for k,v in consts['errors'].items():
print (consts['messages'][k],v)
if __name__ == '__main__':
'''
rs429358 . APOE
'''
configure_regexp()
parse_gff3(consts['gencode'])
index_transcripts()
index_genes()
parser = argparse.ArgumentParser(description='The best python project ever')
parser.add_argument('-q', '--question', help='Your question in english', required=False)
args = parser.parse_args()
question = args.question
if question:
answer(question)
report_errors()
#test_isoform_map()
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment