Last active
April 3, 2020 17:33
-
-
Save kantale/0d11071991a6cfaa23775f564e1a27d7 to your computer and use it in GitHub Desktop.
Υλοποίηση του project στο μάθημα "Εισαγωγή στον προγραμματισμό με τη γλώσσα python" 2018-2019
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import copy | |
import gzip | |
import json | |
import bisect | |
import argparse | |
import requests | |
from collections import defaultdict, OrderedDict | |
from pyliftover import LiftOver | |
''' | |
Create environment | |
/Users/alexandroskanterakis/anaconda3/bin/conda create -n python_project python=3.7 | |
Start environment | |
source /Users/alexandroskanterakis/anaconda3/bin/activate python_project | |
pip install requests | |
pip install pyliftover | |
Deactive | |
source deactivate | |
Remove environment | |
/Users/alexandroskanterakis/anaconda3/bin/conda remove --name python_project --all | |
''' | |
consts = { | |
'gencode': 'gencode.v29.annotation.gff3.gz', | |
'transcripts': {}, # Transcript data | |
'lo': LiftOver('hg19', 'hg38'), | |
'lo_inv': LiftOver('hg38', 'hg19'), | |
'errors': defaultdict(int), | |
'messages': { | |
'pseudoautosomal_lines' : 'pseudoautosomal genes were ignored. Number of lines found:', | |
'pseudoautosomal_transcripts': 'pseudoautosomal genes were ignored. Number of transcripts found:', | |
}, | |
'starts': defaultdict(list), | |
'ends': defaultdict(list), | |
'transcripts_sorted': defaultdict(list), | |
} | |
def lift(chromosome, position, inverse=False): | |
if type(chromosome) is str: | |
if not 'chr' in chromosome: | |
chromosome = 'chr' + chromosome | |
elif type(chromosome) is int: | |
chromosome = 'chr' + str(chromosome) | |
else: | |
raise Exception('Unknown type of parameter chromosome: {}'.format(type(chromosome).__name__)) | |
if inverse: | |
s = consts['lo_inv'].convert_coordinate(chromosome, position) | |
else: | |
s = consts['lo'].convert_coordinate(chromosome, position) | |
if len(s) > 1: | |
print ('WARNING: liftover returned more than one result. Taking the first..') | |
if len(s) == 0: | |
raise Exception('Liftover failed..') | |
return s[0][1] | |
def parse_gff3_line(l): | |
ls = l.split() | |
#Read chromsome | |
chromosome = ls[0] | |
# Make sure that chromosome is 'chr\d+' | |
assert re.match(r'chr[\dXYMT]+', chromosome) | |
#Annotation source | |
source= ls[1] | |
#Make sure that this is either 'HAVANA' or 'ENSEMBL' | |
assert source in ['HAVANA', 'ENSEMBL'] | |
#Type of the regulatory element (gene, transcript, exon, ...) | |
t_type = ls[2] | |
if t_type in ['stop_codon_redefined_as_selenocysteine']: | |
return None | |
#Read start and end | |
start = int(ls[3]) | |
end = int(ls[4]) | |
#Read annotation | |
#For example: ID=exon:ENST00000450305.2:1;Parent=ENST00000450305.2;gene_id=ENSG00000223972.5 .. | |
annotation = {key:value for key,value in [x.split('=') for x in ls[8].split(';')]} | |
return { | |
'chromosome': chromosome, | |
'source': source, | |
'type': t_type, | |
'start': start, | |
'end': end, | |
'annotation': annotation | |
} | |
def decode_appris(appris): | |
''' | |
Return a integer representation of appris | |
The lower the better. | |
''' | |
return { | |
'appris_principal_1': 0, | |
'appris_principal_2': 1, | |
'appris_principal_3': 2, | |
'appris_principal_4': 3, | |
'appris_principal_5': 4, | |
'appris_alternative_1':5, | |
'appris_alternative_2':6, | |
}[appris] | |
def take_best_appris(transcripts): | |
''' | |
From a set of transcripts the one with the lowest apris code | |
''' | |
s = sorted(transcripts, key=lambda x: consts['transcripts'][x]['appris']) | |
return s[0] # FIX ME !!! MIGHT BE MORE THAN LOWEST!!!! | |
def decode_chromosome(chromosome): | |
''' | |
Return an int representation of a chromosome | |
''' | |
m = re.match(r'chr([\dXYMT]+)', chromosome) | |
if m: | |
return m.group(1) | |
raise Exception('Cannot decode chromosome: {}'.format(chromosome)) | |
def store_transcript_data(gff3_data): | |
chromosome = decode_chromosome(gff3_data['chromosome']) | |
annotations = gff3_data['annotation'] | |
transcript_id = annotations['transcript_id'] | |
gene = annotations['gene_name'] | |
t_type = gff3_data['type'] | |
if '_PAR_' in annotations['ID']: | |
# These are pseudoautosomal genes | |
# They have the same transcript id and exist in both X and Y chromosomes.. | |
consts['errors']['pseudoautosomal_lines'] += 1 | |
if t_type == 'transcript': | |
consts['errors']['pseudoautosomal_transcripts'] += 1 | |
return | |
start = gff3_data['start'] | |
end = gff3_data['end'] | |
exon_number = int(annotations.get('exon_number', -1)) | |
#Check that exon number makes sense... | |
if t_type in ['CDS', 'exon', 'start_codon', 'stop_codon', 'three_prime_UTR', 'five_prime_UTR']: | |
assert exon_number>0 | |
location_tuple = (start, end, exon_number) | |
#Read appris information | |
appris = [x for x in annotations['tag'].split(',') if 'appris' in x] | |
assert len(appris) == 1 # Cannot have more than one appries.. | |
appris = appris[0] | |
#print (transcript_id) | |
transcripts = consts['transcripts'] | |
if not transcript_id in transcripts: | |
transcripts[transcript_id] = { | |
'gene': gene, | |
'chr': chromosome, | |
'start': None, | |
'end': None, | |
'exons': [], | |
'cds': [], | |
'5utr': [], | |
'3utr': [], | |
'start_c': [], # Start codon | |
'stop_c': [], # Stop codon | |
'appris': None, | |
} | |
transcript_info = transcripts[transcript_id] | |
#Some quality control | |
assert transcript_info['gene'] == gene | |
#Store transcript information in relevant fields | |
if t_type == 'transcript': | |
assert transcript_info['appris'] is None | |
transcript_info['appris'] = decode_appris(appris) | |
assert transcript_info['start'] is None | |
assert transcript_info['end'] is None | |
transcript_info['start'] = start | |
transcript_info['end'] = end | |
elif t_type == 'exon': | |
transcript_info['exons'].append(location_tuple) | |
elif t_type == 'CDS': | |
transcript_info['cds'].append(location_tuple) | |
elif t_type == 'start_codon': | |
#assert transcript_info['start_c'] is None # Cannot have two start codons.. Yet.. some.. have | |
transcript_info['start_c'].append(location_tuple) | |
elif t_type == 'stop_codon': | |
#assert transcript_info['stop_c'] is None # Cannot have two stop codons.. | |
transcript_info['stop_c'].append(location_tuple) | |
elif t_type == 'five_prime_UTR': | |
transcript_info['5utr'].append(location_tuple) | |
elif t_type == 'three_prime_UTR': | |
transcript_info['3utr'].append(location_tuple) | |
else: | |
raise Exception('Unknown regulatory element: {}'.format(t_type)) | |
def parse_gff3(fn): | |
''' | |
Parse gff3 file | |
''' | |
print ('Start parsing:', fn) | |
with gzip.open(fn, 'rt') as f: | |
line_counter = 0 | |
for l in f: | |
line_counter += 1 | |
if line_counter % 100000 == 0: | |
print ('Lines: {0} {1:.2f}%'.format(line_counter, 100*line_counter/2739404)) | |
if l[0] == '#': # This is a comment | |
continue | |
if not 'appris' in l: # Ignore transcripts without appris information | |
continue | |
gff3_data = parse_gff3_line(l) | |
if not gff3_data: | |
continue | |
store_transcript_data(gff3_data) | |
#if line_counter > 10000: | |
# break | |
#if gff3_data['chromosome'] != 'chr1': | |
# break | |
print ('Done parsing:', fn) | |
def int_commas(i): | |
''' | |
1234 --> 1,234 | |
12345 --> 12,345 | |
123456 --> 123,456 | |
1234567 --> 1,234,567 | |
... | |
''' | |
s = str(i) | |
return ','.join([s[::-1][x:x+3][::-1] for x in range(0,len(s),3)][::-1]) | |
# return ','.join([s[0 if x-3<0 else x-3:x] for x in range(len(s),0,-3)][::-1]) # Another way | |
def format_pos(p): | |
''' | |
padding. | |
Making sure that the string reporesentation of | |
position takes always exactly 11 characters | |
The padding is equal-ish to the left and right | |
''' | |
s = int_commas(p) | |
padding = 11-len(s) | |
padding_left = padding//2 | |
padding_right = padding - padding_left | |
ret = (' ' * padding_left) + s + (' ' * padding_right) | |
assert len(ret) == 11 | |
return ret | |
def format_exon(p): | |
''' | |
exon number | |
''' | |
if p < 10: | |
return str(p) + ' ' | |
return str(p) | |
def exon_box_tag(name): | |
d = { | |
'5utr': '5\'UTR', | |
'3utr': '3\'UTR', | |
'utr5': '5\'UTR', | |
'utr3': '3\'UTR', | |
'cds' : ' CDS' | |
} | |
return d[name] | |
def exon_str_1(number, first): | |
''' | |
print (exon_str_1(50)) | |
''' | |
return ''' | |
╔══════════════╗ | |
║ Exon {} ║ | |
╟──────────────╢ | |
║ {} ║ | |
╟──────────────╢ | |
║ ║ | |
║ ║ | |
'''.format(format_exon(number), exon_box_tag(first)) | |
def exon_str_2(number, first, second): | |
''' | |
print (exon_str_2(4, '3utr', 'cds')) | |
print (exon_str_2(4, 'cds', '5utr')) | |
print (exon_str_2(4, 'cds', 'cds')) | |
print (exon_str_2(4, '3utr', '3utr')) | |
''' | |
return ''' | |
╔═════════════════════════════╗ | |
║ Exon {} ║ | |
╟──────────────┬──────────────╢ | |
║ {} │ {} ║ | |
╟──────────────┼──────────────╢ | |
║ │ ║ | |
║ │ ║ | |
'''.format(format_exon(number), exon_box_tag(first), exon_box_tag(second)) | |
def exon_str_3(number, first, second, third): | |
return ''' | |
╔════════════════════════════════════════════╗ | |
║ Exon {} ║ | |
╟──────────────┬──────────────┬──────────────╢ | |
║ {} │ {} │ {} ║ | |
╟──────────────┼──────────────┼──────────────╢ | |
║ │ │ ║ | |
║ │ │ ║ | |
'''.format(format_exon(number), exon_box_tag(first), exon_box_tag(second), exon_box_tag(third)) | |
def exon_join(): | |
''' | |
Oh christmas tree... | |
''' | |
return ''' | |
╱╲ | |
╱ ╲ | |
╱ ╲ | |
╱ ╲ | |
╱ ╲ | |
╱ ╲ | |
╱ ╲ | |
''' | |
def space_check(space_1, space_2): | |
''' | |
Returns True if space1 is BEFORE space_2 | |
Returns False is space1 is AFTER space_2 | |
Throws excpetion in case of overlap | |
''' | |
if space_1[0] <= space_1[1] <= space_2[0] <= space_2[1]: | |
return True | |
if space_2[0] <= space_2[0] and space_1[0] <= space_1[1]: | |
return False | |
raise Exception('Space overlap: {} {}'.format(space_1, space_2)) | |
def exon_map(exon, cds, utr_3, utr_5): | |
''' | |
Return a box for each exon | |
Return the positions of the edges of the boxes | |
''' | |
exon_number = exon[2] | |
if utr_3 is None and utr_5 is None: | |
assert exon == cds # If not utrs present exon and cds should coincide | |
return exon_str_1(exon_number, 'cds') , (exon[0], exon[1]) | |
elif cds is None and utr_5 is None: | |
assert exon == utr_3 | |
return exon_str_1(exon_number, 'utr3'), (exon[0], exon[1]) | |
elif cds is None and utr_3 is None: | |
assert exon == utr_5 | |
return exon_str_1(exon_number, 'utr5'), (exon[0], exon[1]) | |
elif utr_3 is None: | |
assert utr_5 # Redundant but ok.. | |
#Which is first, CDS or UTR5? | |
s_check = space_check(cds, utr_5) | |
if s_check: | |
assert exon[0] == cds[0] | |
assert cds[1] == utr_5[0] - 1 | |
assert exon[1] == utr_5[1] | |
return exon_str_2(exon_number, 'cds', 'utr5'), (exon[0], cds[1], exon[1]) | |
else: | |
assert exon[0] == utr_5[0] | |
assert utr_5[1] == cds[0] - 1 | |
assert exon[1] == cds[1] | |
return exon_str_2(exon_number, 'utr5', 'cds'), (exon[0], utr_5[1], exon[1]) | |
elif utr_5 is None: | |
assert utr_3 # Redundancy is bliss.. | |
s_check = space_check(cds, utr_3) | |
if s_check: | |
assert exon[0] == cds[0] | |
assert cds[1] == utr_3[0] - 1 | |
assert exon[1] == utr_3[1] | |
return exon_str_2(exon_number, 'cds', 'utr3'), (exon[0], cds[0], exon[1]) | |
else: | |
assert exon[0] == utr_3[0] | |
assert utr_3[1] == cds[0] - 1 | |
assert exon[1] == cds[1] | |
return exon_str_2(exon_number, 'utr3', 'cds'), (exon[0], utr_3[1], exon[1]) | |
else: | |
assert cds | |
assert utr_3 | |
assert utr_5 | |
# 69055, 70108, 1) (69091, 70008, 1) (70009, 70108, 1) (69055, 69090, 1) | |
# Which is first, utr_3 or utr_5 ? | |
s_check = space_check(utr_3, utr_5) | |
if s_check: | |
assert utr_3[0] <= utr_3[1] <= cds[0] <= cds[1] <= utr_5[0] <= utr_5[1] | |
return exon_str_3(exon_number, 'utr3', 'cds', 'utr5'), (exon[0], cds[0], utr_5[0], exon[1]) | |
else: | |
assert utr_5[0] <= utr_5[1] <= cds[0] <= cds[1] <= utr_3[0] <= utr_3[1] | |
return exon_str_3(exon_number, 'utr5', 'cds', 'utr3'), (exon[0], cds[0], utr_3[0], exon[1]) | |
raise Exception('This should never happen..') | |
def isoform_map(transcript, position=None): | |
''' | |
position: position to Highligh! | |
''' | |
tr_data = consts['transcripts'][transcript] | |
def element_getter(element, exon_number): | |
ret = [x for x in tr_data[element] if x[2]==exon_number] | |
if len(ret) > 1: | |
raise Exception('Transcript: {} exon_number: {} has more than one: {}'. | |
format(transcript, exon_number, element)) | |
if ret: | |
return ret[0] | |
return None # Do not have do.. but always nice and readable | |
#Order exons accorsing to position | |
exons_ordered = sorted(tr_data['exons']) | |
boxes = [] | |
positions = [] | |
for exon in exons_ordered: | |
exon_number = exon[2] | |
#Get all elements of the exon | |
cds = element_getter('cds', exon_number) | |
utr_3 = element_getter('3utr', exon_number) | |
utr_5 = element_getter('5utr', exon_number) | |
if boxes: | |
boxes.append(exon_join()) | |
exon_box, exon_positions = exon_map(exon, cds, utr_3, utr_5) | |
boxes.append(exon_box) | |
positions.append(exon_positions) | |
boxes = [x.split('\n')[1:] for x in boxes] # Remove first line | |
assert len({len(x) for x in boxes}) <= 1 # Each "box" should have the same number of lines | |
boxes_str = '\n'.join([' '+''.join(x) for x in zip(*boxes)]) # "Zip" boxes together | |
# print (boxes_str) | |
boxes_str = boxes_str[:-5] # Remove enter at the end of boxes | |
all_positions = [y for x in positions for y in x] # Flatten positions | |
positions_str = '' + ' '.join([format_pos(x) for x in all_positions]) | |
# print (positions_str) | |
complete_str = boxes_str + positions_str | |
if position: | |
position_index = bisect.bisect(all_positions, position) | |
#print ('bisect:', position_index) | |
if all_positions[position_index-1] == position: | |
position_line = ' ' *(15*(position_index-1)) + format_pos(position) | |
else: | |
position_line = ' ' *(15*position_index-7) + format_pos(position) | |
complete_str = complete_str + '\n' + position_line | |
return complete_str | |
#print (tr_data) | |
#isoform_map('ENST00000433179.3') | |
def test_isoform_map(): | |
''' | |
For all transcripts check isoform maps | |
''' | |
print ('Generating all possible isoforms!') | |
d= open('del.isoforms', 'w') | |
for counter, transcript in enumerate(consts['transcripts']): | |
if counter % 100 == 0: | |
print ('Isoforms:', counter) | |
m = isoform_map(transcript) | |
d.write(m) | |
d.write('\n') | |
#print (m) | |
if counter > 10000: | |
break | |
print ('Testing done. Total: ', counter) | |
d.close() | |
def test_isoform_map_2(): | |
print (isoform_map('ENST00000332831.4')) | |
print (isoform_map('ENST00000332831.4', 685700)) | |
print (isoform_map('ENST00000332831.4', 686000)) | |
print (isoform_map('ENST00000332831.4', 686660)) | |
print (isoform_map('ENST00000332831.4', 685679)) | |
print (isoform_map('ENST00000332831.4', 685716)) | |
print (isoform_map('ENST00000332831.4', 686673)) | |
def search_intervals(starts, ends, position): | |
''' | |
Assumes intervals of numbers. | |
zip(starts, ends) | |
which indexes of these contain the position? | |
''' | |
assert len(starts) == len(ends) | |
if position < starts[0]: | |
return () ## Too small position | |
if position > ends[-1]: | |
return () | |
start_position = bisect.bisect_right(starts, position) | |
# We do know that all position right from start_position are not suitable | |
#print (start_position) | |
if start_position == 0: | |
return () ## Too small position | |
if start_position == len(starts): | |
start_position -= 1 | |
ret = [] | |
offset = 0 | |
while True: | |
current_pos = start_position-offset | |
if current_pos < 0: | |
break | |
current_start = starts[current_pos] | |
current_end = ends[current_pos] | |
if current_start <= position <= current_end: | |
ret.append(current_pos) | |
else: | |
if offset>=1: | |
break | |
offset += 1 | |
return ret | |
def test_search_intervals(): | |
a = [20,50,80] | |
b = [40,60,100] | |
print (a) | |
print (b) | |
print (search_intervals(a,b,55), '1') | |
print (search_intervals(a,b,30), '0') | |
print (search_intervals(a,b,85), '2') | |
print (search_intervals(a,b,15), '<empty>') | |
print (search_intervals(a,b,110), '<empty>') | |
print (search_intervals(a,b,20), '0') | |
print (search_intervals(a,b,40), '0') | |
print (search_intervals(a,b,50), '1') | |
print (search_intervals(a,b,60), '1') | |
print (search_intervals(a,b,80), '2') | |
print (search_intervals(a,b,100), '2') | |
print (search_intervals(a,b,45), '<empty>', 45) | |
print (search_intervals(a,b,70), '<empty>', 70) | |
a = [20,50,80] | |
b = [60,60,100] | |
print (a) | |
print (b) | |
print (search_intervals(a,b,55), '1,0') | |
print (search_intervals(a,b,60), '1,0') | |
print (search_intervals(a,b,50), '1,0') | |
print (search_intervals(a,b,40), '0') | |
print (search_intervals(a,b,20), '0') | |
print (search_intervals(a,b,19), '<empty>') | |
print (search_intervals(a,b,85), '2') | |
print (search_intervals(a,b,80), '2') | |
print (search_intervals(a,b,100), '2') | |
print (search_intervals(a,b,105), '<empty>') | |
a = [20,50,80] | |
b = [100,100,100] | |
print (a) | |
print (b) | |
print (search_intervals(a,b,90), '2,1,0') | |
print (search_intervals(a,b,30), '0') | |
print (search_intervals(a,b,60), '0,1') | |
def search_empty_intervals(starts, ends, position): | |
''' | |
Get empty intervals | |
''' | |
start_position = bisect.bisect(starts, position) | |
if start_position == 0: | |
return (None, 0) | |
if start_position == len(starts): | |
return (start_position-1, None) | |
return (start_position-1, start_position) | |
def index_transcripts(): | |
print ('Indexing transcripts..') | |
transcripts = consts['transcripts'] | |
sorted_positions = sorted((v['chr'], v['start'], v['end'], k) for k, v in transcripts.items()) | |
#consts['all_chromosomes'] = {v['chr'] for k,v in transripts.item()} | |
for chromosome, start, end, transcript in sorted_positions: | |
consts['starts'][chromosome].append(start) | |
consts['ends'][chromosome].append(end) | |
consts['transcripts_sorted'][chromosome].append(transcript) | |
print (' Indexing transcripts DONE.') | |
def index_genes(): | |
print ('Indexing genes..') | |
transcripts = consts['transcripts'] | |
# Create a dictionary. Key = gene, values = list of transcripts | |
genes = defaultdict(list) | |
_ = [genes[v['gene']].append(k) for k,v in transcripts.items()] | |
genes = {k:take_best_appris(v) for k,v in genes.items()} | |
consts['genes'] = genes | |
# Bug: Previous of first gene in chromosome 2 is the last of chromosome 1.. | |
consts['genes_ordered'] = OrderedDict(sorted(((k, (transcripts[v]['chr'], transcripts[v]['start'], transcripts[v]['end'])) for k,v in genes.items()), key=lambda x: x[1])) | |
consts['genes_ordered_keys'] = list(consts['genes_ordered'].keys()) | |
consts['genes_ordered_values'] = list(consts['genes_ordered'].values()) | |
print (' Indexing genes DONE.') | |
def search_transcripts(chromosome, position): | |
''' | |
Get all the transcripts with that position | |
print(search_transcripts(686655)) # (True, ['ENST00000332831.4']) | |
print(search_transcripts(500000)) # (False, ['ENST00000426406.3', 'ENST00000332831.4']) | |
print(search_transcripts(50000)) # (False, [None, 'ENST00000335137.4']) | |
''' | |
found_intervals = search_intervals(consts['starts'][chromosome], consts['ends'][chromosome], position) | |
if found_intervals: | |
return True, [consts['transcripts_sorted'][chromosome][x] for x in found_intervals] | |
intervals = search_empty_intervals(consts['starts'][chromosome], consts['ends'][chromosome], position) | |
return False, [consts['transcripts_sorted'][chromosome][x] if x else None for x in intervals] | |
#======================QUESTIONS============================ | |
def configure_regexp(): | |
refseq = r'NM_[\d]+\.[\d]+|NM_[\d]+' | |
ensembl = r'ENST[\d]+\.[\d]+|ENST[\d]+' | |
chromosome = r'1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|x|y|X|Y' | |
transcript = r'(?P<refseq>{})|(?P<ensembl>{})|(?P<gene>[\w]+)|(?P<hgvs_chromosome>{})'.format(refseq, ensembl, chromosome) | |
chromosome_decl = r'chromosome|chr' | |
split_chr_position = r':|,|and[\s]+position|and' | |
rs = r'rs[\d]+' | |
mutation_change = r'[ACGT]>[ACGT]' | |
chr_mutation = r'({})?[\s]*(?P<chromosome>{})[\s]*({})[\s]*(?P<position>[\d]+)[\s]*(?P<change>{})?'.format(chromosome_decl, chromosome, split_chr_position, mutation_change) # chr 1 : 234234234 | |
hgvs_mutation = r'({}):[cg]\.[\d]+({})'.format(transcript, mutation_change) | |
mutation=r'(?P<rs>{})|(?P<hgvs_mutation>{})|(?P<chr_mutation>{})'.format(rs, hgvs_mutation, chr_mutation) | |
#print (mutation) | |
mut_reg = re.compile(mutation) | |
consts['mut_regexp'] = mut_reg | |
if False: # To test | |
m = mut_reg.search('rs12345') | |
print(m.groupdict()) | |
m = mut_reg.search('1:123') | |
print(m.groupdict()) | |
m = mut_reg.search('chr1:123') | |
print(m.groupdict()) | |
m = mut_reg.search('chromosome 1:123') | |
print(m.groupdict()) | |
m = mut_reg.search('chromosome 1 and position 123') | |
print(m.groupdict()) | |
m = mut_reg.search('chromosome 1 and 123') | |
print(m.groupdict()) | |
m = mut_reg.search('NM_234234:c.100A>G') | |
print(m.groupdict()) | |
m = mut_reg.search('ENST00022222:c.100A>G') | |
print(m.groupdict()) | |
m = mut_reg.search('2:g.100A>G') | |
print(m.groupdict()) | |
m = mut_reg.search('chr2:g.100A>G') | |
print(m.groupdict()) | |
def get_location_from_regexp_match(match): | |
''' | |
python project.py --question " rs58991260 " --> Chromosome: 1 Position: 218458480 . bnelongs in between transcripts | |
python project.py --question " rs262656 " | |
python project.py --question " rs780472451 " --> Coding Sequence Variant | |
python project.py --question " 1:2157315 " | |
python project.py --question " AGT:c.803T>C " | |
python project.py --question " NM_003193.4:c.464T>A " | |
python project.py --question " NM_003002.3:c.274G>T " | |
python project.py --question " ENST00000003084:c.1431T>G " | |
''' | |
#print (match) | |
if match['rs']: | |
location, change = myvariant(match['rs'], task='fetch_location_of_dbsnp') | |
return location, change | |
if match['chr_mutation']: | |
location = {'chromosome': match['chromosome'], 'position': int(match['position'])} | |
change = match['change'] | |
return location, chang | |
if match['hgvs_mutation']: | |
# There are three posibilities: GENE, ENSEMBL, REFSEQ | |
if match['gene']: | |
location, change = vep(match['hgvs_mutation'], task="fetch_location_from_HGVS") # AGT:c.803T>C | |
return location, change | |
elif match['refseq']: | |
location, change = mutalyzer(match['hgvs_mutation'], task='') # NM_003002.3:c.274G>T | |
return location, change | |
elif match['ensembl']: | |
location, change = vep(match['hgvs_mutation'], task="fetch_location_from_HGVS") # ENST00000003084:c.1431T>G | |
return location, change | |
else: | |
raise Exception('This should never happen') | |
else: | |
raise Exception('Could not extract location from match') | |
def answer(question): | |
''' | |
rs58991260 . in chromosome 1 | |
''' | |
genes_ordered_values = consts['genes_ordered_values'] | |
genes_ordered_keys = consts['genes_ordered_keys'] | |
transcripts = consts['transcripts'] | |
print ('The question asked is:') | |
print ('--> ' + question) | |
to_print = [] | |
gene = None | |
transcript = None | |
primary_transcript = None | |
location = None | |
change = None | |
m = re.search(r'gene[\s]+([\w]+)', question) | |
if m: | |
gene = m.group(1) | |
gene = gene.upper() | |
if gene in consts['genes']: | |
transcript = consts['genes'][gene] | |
primary_transcript = transcript | |
else: | |
gene = None | |
#Check if there is a location in the question. | |
question_locations = consts['mut_regexp'].search(question) | |
if question_locations: | |
question_locations_d = question_locations.groupdict() | |
location, change = get_location_from_regexp_match(question_locations_d) | |
# Search for transcripts | |
found, transcripts_found = search_transcripts(**location) | |
if not found: # rs58991260 | |
to_print.append('Position of : {} does not belong in any transcript'.format(question_locations.group(0))) | |
to_print.append('It belongs in between these transcripts') | |
to_print.append(' '.join(transcripts_found)) | |
else: #rs262656 | |
to_print.append('Found the following transcripts:') | |
for x in transcripts_found: | |
to_print.append(' {} --> Gene:{} appris: {}'.format(x, transcripts[x]['gene'], transcripts[x]['appris'])) | |
primary_transcript = take_best_appris(transcripts_found) | |
gene = [transcripts[x]['gene'] for x in transcripts_found if x==primary_transcript][0] | |
transcript = primary_transcript | |
if location: | |
to_print.append('Chromosome: {chromosome} Position: {position}'.format(**location)) | |
# Check # myvariant(location={'chromosome':9, 'position':107620835 }, task="fetch_variants_from_location") | |
# Get all HGVS and rs from this location | |
myvariant_variants = myvariant(location=copy.copy(location), task="fetch_variants_from_location") | |
to_print.append('Found variants on this location:') | |
for x in myvariant_variants: | |
to_print.append(' {variant} --> {rs}'.format(**x)) | |
position_from = max(0, int(location['position'])-50) | |
position_to = int(location['position']) + 50 | |
sequence = ensembl_region(location['chromosome'], position_from, position_to) | |
to_print.append('Reference Sequence: {} FROM:{}(-50) TO: {}(+50)'.format(location['chromosome'], position_from, position_to)) | |
to_print.append(sequence) | |
if location and change: | |
to_print.append(myvariant(variant={ | |
'chromosome':location['chromosome'], | |
'position': location['position'], | |
'change':change}, task="fetch_info_from_variant")) | |
if gene: | |
to_print.append('Gene: {}'.format(gene)) | |
gene_info = mygene(gene, 'get_info') | |
to_print.append('Gene name: {}'.format(gene_info['name'])) | |
to_print.append('Type of gene: {}'.format(gene_info['type_of_gene'])) | |
to_print.append('Other Gene names:') | |
to_print.append(gene_info['other_names']) | |
to_print.append('Summary:') | |
to_print.append(gene_info['summary']) | |
to_print.append('Gene location information:') | |
to_print.append(gene_info['location']) | |
to_print.append('Refseq Translation:') | |
to_print.append(gene_info['refseq']) | |
to_print.append('Pathways:') | |
to_print.append(gene_info['pathway']) | |
to_print.append('This gene also exists on the following organisms:') | |
to_print.append(gene_info['taxonomy']) | |
if primary_transcript: | |
to_print.append('Primary Transcript: {}'.format(primary_transcript)) | |
#if any((x in question for x in ['upstream', 'downstream', 'closer', 'closest'])) and gene: | |
if gene: | |
location_index = consts['genes_ordered_keys'].index(gene) | |
decorate_with_stars = lambda x,right : '*' + x + '*' if x==right else x | |
docorate_with_positions = lambda i : int_commas(genes_ordered_values[i][1]) + ' ' + decorate_with_stars(genes_ordered_keys[i], gene) + ' ' + int_commas(genes_ordered_values[i][2]) | |
f = max(0, location_index-3) | |
t = min(len(consts['genes']), location_index+3) | |
to_show = [docorate_with_positions(i) for i in range(f,t+1)] | |
to_print.append('Downstream / Upstream genes:') | |
to_print.append((' '*4) + ' --> '.join(to_show)) | |
if 'transcripts' in question and gene: | |
to_print.append('Transcripts of gene: {}'.format(gene)) | |
tr = [k for k,v in transcripts.items() if v['gene'] == gene] | |
for i, x in enumerate(tr): | |
if x == primary_transcript: | |
to_print.append('{} {} *PRIMARY*'.format(i+1, x)) | |
else: | |
to_print.append('{} {}'.format(i+1, x)) | |
if transcript: | |
to_print.append('Isoform map of {}:'.format(transcript)) | |
to_print.append(isoform_map(transcript, position = location['position'] if location else None)) | |
print('\n'.join(to_print)) | |
#================================================== | |
# MyVariant.info | |
# Example of result from variant.infgo from rs58991260 | |
''' | |
{ | |
"max_score": 15.177358, | |
"took": 3, | |
"total": 1, | |
"hits": [ | |
{ | |
"_id": "chr1:g.218631822G>A", | |
"_score": 15.177358, | |
"dbsnp": { | |
"_license": "https://goo.gl/Ztr5rl", | |
"allele_origin": "unspecified", | |
"alleles": [ | |
{ | |
"allele": "G", | |
"freq": 0.9784 | |
}, | |
{ | |
"allele": "A", | |
"freq": 0.02157 | |
} | |
], | |
"alt": "A", | |
"chrom": "1", | |
"class": "SNV", | |
"dbsnp_build": 129, | |
"flags": [ | |
"ASP", | |
"G5", | |
"GNO", | |
"KGPhase1", | |
"KGPhase3", | |
"SLO" | |
], | |
"gmaf": 0.02157, | |
"hg19": { | |
"end": 218631822, | |
"start": 218631822 | |
}, | |
"ref": "G", | |
"rsid": "rs58991260", | |
"validated": false, | |
"var_subtype": "ts", | |
"vartype": "snp" | |
} | |
} | |
] | |
} | |
''' | |
def json_pprint(j): | |
print (json.dumps(j, indent=4)) | |
def myvariant(variant=None, location=None, task=None): | |
''' | |
variant can be rs.. | |
http://myvariant.info/v1/variant/chr9:g.107620835G>A | |
http://myvariant.info/v1/variant/chr9:g.107620835G>A?fields=dbsnp.rsid,dbsnp.vartype | |
http://myvariant.info/v1/query?q=chr9:107620835&fields=gnomad_genome.af,exac.af,dbnsfp.1000gp3.af | |
http://myvariant.info/v1/query?q=rs58991260 | |
http://myvariant.info/v1/query?q=chr9:107620835&fields=_id,dbsnp.rsid | |
myvariant(location={'chromosome':9, 'position':107620835 }, task="fetch_variants_from_location") | |
myvariant(variant={'chromosome':19, 'position': 44908684, 'change':'T>C'}, task="fetch_info_from_variant") | |
''' | |
def lift_hgvs(hgvs): | |
s = re.search(r'chr([\d+XY]+):g\.([\d]+)([ACGT]>[ACGT])', hgvs) | |
p = lift(chromosome=s.group(1), position=int(s.group(2))) | |
return 'chr{}:g.{}{}'.format(s.group(1), p, s.group(3)) | |
if task == 'fetch_location_of_dbsnp': | |
url = 'http://myvariant.info/v1/query' | |
parameters = { | |
'q': variant, | |
'fields': 'dbsnp', | |
#'hg38': 'true', # Makes no differnce | |
} | |
elif task == 'fetch_info_from_variant': | |
variant['position'] = lift(variant['chromosome'], variant['position'], inverse=True) | |
this_variant = 'chr{chromosome}:g.{position}{change}'.format(**variant) | |
url = f'http://myvariant.info/v1/variant/{this_variant}' | |
parameters = { | |
'fields': 'gnomad_genome.af,exac.af,dbnsfp.1000gp3.af,clinvar,snpeff.ann,cadd.polyphen,cadd.sift.cat' | |
} | |
elif task == 'fetch_variants_from_location': | |
url = 'http://myvariant.info/v1/query' | |
location['position'] = lift(location['chromosome'], location['position'], inverse=True) | |
parameters = { | |
'q': 'chr{chromosome}:{position}'.format(**location), | |
'fields': '_id,dbsnp.rsid', | |
#'hg38': 'true', # No effect ?? | |
} | |
else: | |
raise Exception('Unknown task: {}'.format(task)) | |
r = requests.get(url, params=parameters) | |
if not r.ok: | |
r.raise_for_status() | |
j = r.json() | |
#json_pprint(j) | |
if task == 'fetch_location_of_dbsnp': | |
chromosome = j['hits'][0]["dbsnp"]["chrom"] | |
position_19 = j['hits'][0]["dbsnp"]["hg19"]["start"] | |
position_38 = lift(chromosome, position_19) | |
change = '{}>{}'.format(j['hits'][0]['dbsnp']['ref'], j['hits'][0]['dbsnp']['alt']) | |
return {'chromosome': chromosome, 'position': position_38}, change | |
elif task == 'fetch_info_from_variant': | |
ret = [] | |
if 'dbnsfp' in j: | |
if '1000gp3' in j['dbnsfp']: | |
if 'af' in j['dbnsfp']['1000gp3']: | |
ret.append('1000 Genomes Allele Frequency: {}'.format(j['dbnsfp']['1000gp3']['af'])) | |
if 'exac' in j: | |
if 'af' in j['exac']: | |
ret.append('ExAC Allele Frequency: {}'.format(j['exac']['af'])) | |
if 'gnomad_genome' in j: | |
if 'af' in j['gnomad_genome']['af']: | |
if 'af' in j['gnomad_genome']['af']: | |
ret.append('gnomAD Allele Frequency: {}'.format(j['gnomad_genome']['af']['af'])) | |
if 'clinvar' in j: | |
if 'rcv' in j['clinvar']: | |
ret.append('Clinvar:') | |
ret.extend([' {} --> {} --> {} '.format(x['clinical_significance'], x['conditions']['name'], x['review_status']) for x in j['clinvar']['rcv']]) | |
if 'snpeff' in j: | |
if 'ann' in j['snpeff']: | |
ret.append('snpeff:') | |
if type(j['snpeff']['ann']) is list: | |
#json_pprint(j['snpeff']['ann']) | |
ret.extend([' {}:{} --> {}'.format(x['feature_id'], x['hgvs_c'], x['effect']) for x in j['snpeff']['ann']]) | |
else: | |
if 'effect' in j['snpeff']['ann']: | |
ret.append(' {}:{} --> {}'.format(j['snpeff']['ann']['feature_id'], j['snpeff']['ann']['hgvs_c'], j['snpeff']['ann']['effect'])) | |
if 'cadd' in j: | |
if 'polyphen' in j['cadd']: | |
if 'cat' in j['cadd']['polyphen']: | |
ret.append('polyphen category: {}'.format(j['cadd']['polyphen']['cat'])) | |
if 'sift' in j['cadd']: | |
if 'cat' in j['cadd']['sift']: | |
ret.append('SIFT category: {}'.format(j['cadd']['sift']['cat'])) | |
ret = '\n'.join(ret) | |
return ret | |
elif task == 'fetch_variants_from_location': | |
# http://myvariant.info/v1/query?q=chr9:107620835&fields=_id,dbsnp.rsid | |
return [{'variant': lift_hgvs(x['_id']), 'rs': x.get('dbsnp', {'rsid': None})['rsid']} for x in j['hits']] | |
else: | |
raise Exception('Unknown task: {}'.format(task)) | |
#================End of MyVariant.info ############# | |
#================================================== | |
#==============VEP ===================== | |
''' | |
[ | |
{ | |
"id": "AGT:c.803T>C", | |
"seq_region_name": "1", | |
"strand": -1, | |
"colocated_variants": [ | |
{ | |
"start": "230710048", | |
"strand": "1", | |
"seq_region_name": "1", | |
"end": "230710048", | |
"allele_string": "HGMD_MUTATION", | |
"phenotype_or_disease": "1", | |
"id": "CM920010" | |
}, | |
{ | |
"strand": "1", | |
"seq_region_name": "1", | |
"start": "230710048", | |
"minor_allele": "A", | |
"phenotype_or_disease": "1", | |
"minor_allele_freq": "0.2949", | |
"frequencies": { | |
"C": { | |
"gnomad_sas": "0.6204", | |
"eas": "0.8532", | |
"amr": "0.6354", | |
"gnomad_nfe": "0.4202", | |
"gnomad_asj": "0.4391", | |
"gnomad_oth": "0.5164", | |
"aa": "0.8268", | |
"gnomad_amr": "0.7185", | |
"gnomad_fin": "0.4394", | |
"afr": "0.9032", | |
"ea": "0.4258", | |
"sas": "0.636", | |
"gnomad": "0.5464", | |
"gnomad_afr": "0.8485", | |
"gnomad_eas": "0.8378", | |
"eur": "0.4115" | |
} | |
}, | |
"clin_sig": [ | |
"benign", | |
"risk_factor" | |
], | |
"id": "rs699", | |
"end": "230710048", | |
"pubmed": [ | |
"21988197", | |
"9831339", | |
"20486282", | |
"21681796" | |
], | |
"allele_string": "A/G" | |
} | |
], | |
"most_severe_consequence": "missense_variant", | |
"input": "AGT:c.803T>C", | |
"allele_string": "T/C", | |
"transcript_consequences": [ | |
{ | |
"cds_start": 803, | |
"polyphen_prediction": "benign", | |
"sift_prediction": "tolerated", | |
"sift_score": 1, | |
"gene_symbol": "AGT", | |
"biotype": "protein_coding", | |
"gene_id": "ENSG00000135744", | |
"protein_end": 268, | |
"cdna_start": 1018, | |
"polyphen_score": 0, | |
"hgnc_id": "HGNC:333", | |
"amino_acids": "M/T", | |
"consequence_terms": [ | |
"missense_variant" | |
], | |
"codons": "aTg/aCg", | |
"protein_start": 268, | |
"variant_allele": "C", | |
"cds_end": 803, | |
"impact": "MODERATE", | |
"strand": -1, | |
"transcript_id": "ENST00000366667", | |
"gene_symbol_source": "HGNC", | |
"cdna_end": 1018 | |
}, | |
{ | |
"variant_allele": "C", | |
"transcript_id": "ENST00000412344", | |
"gene_id": "ENSG00000244137", | |
"strand": -1, | |
"impact": "MODIFIER", | |
"distance": 650, | |
"gene_symbol_source": "Clone_based_ensembl_gene", | |
"gene_symbol": "AL512328.1", | |
"consequence_terms": [ | |
"downstream_gene_variant" | |
], | |
"biotype": "antisense" | |
} | |
], | |
"end": 230710048, | |
"assembly_name": "GRCh38", | |
"start": 230710048 | |
} | |
] | |
''' | |
def vep(variant, task): | |
''' | |
https://rest.ensembl.org/documentation/info/vep_hgvs_get | |
AGT:c.803T>C | |
''' | |
server = "https://rest.ensembl.org" | |
ext = "/vep/human/hgvs/{}?".format(variant) | |
parameters = { | |
"content-type": "application/json" | |
} | |
r = requests.get(server+ext, headers={ "Content-Type" : "application/json"}, params=parameters) | |
if not r.ok: | |
r.raise_for_status() | |
decoded = r.json() | |
#json_pprint(decoded) | |
chromosome = decoded[0]['seq_region_name'] | |
position = int(decoded[0]['start']) | |
change = decoded[0]['allele_string'] | |
if task == 'fetch_location_from_HGVS': | |
location = {'chromosome': chromosome, 'position': position} | |
return location, change | |
else: | |
raise Exception('Uknown task: {}'.format(task)) | |
#==========END OF VEP ================== | |
### ensembl region ### | |
def ensembl_region(chromosome, f, t): | |
''' | |
ensembl_region('X', 1000000, 1000100) | |
https://rest.ensembl.org/sequence/region/human/X:1000000..1000100:1?content-type=text/plain | |
''' | |
if t<=f: | |
raise Exception('from should be greater than to') | |
url = f'https://rest.ensembl.org/sequence/region/human/{chromosome}:{f}..{t}:1' | |
parameters = { | |
'content-type' : 'text/plain', | |
} | |
r = requests.get(url, params=parameters) | |
if not r.ok: | |
r.raise_for_status() | |
t = r.text | |
return t | |
### End of ensebl region | |
########### mutalyzer ############## | |
def mutalyzer(variant, task): | |
''' | |
https://mutalyzer.nl/soap-api | |
https://mutalyzer.nl/json/runMutalyzer?variant=NM_003193.4:c.464T%3EA | |
NM_003193.4:c.464T>A | |
NM_003002.3:c.274G>T | |
https://mutalyzer.nl/json/numberConversion?variant=NM_003002.3:c.274G%3ET&build=hg38 | |
''' | |
url='https://mutalyzer.nl/json/numberConversion' | |
parameters = { | |
"variant" : variant, | |
"build" : "hg38", | |
} | |
r = requests.get(url, params=parameters) | |
if not r.ok: | |
r.raise_for_status() | |
decoded = r.json() # NC_000011.10:g.112088971G>T | |
m = re.search(r'NC_([\d]+)\.[\d]+:g\.([\d]+)([ACGT]>[ACGT])', decoded[0]) | |
chromosome = int(m.group(1)) | |
position = int(m.group(2)) | |
change = m.group(3) | |
location = {'chromosome': chromosome, 'position': position} | |
return location, change | |
########### end of mutalyzer ######## | |
############ mygene.info ####################### | |
def mygene_taxonomy(gene): | |
tax_names = { | |
9606 : ('human', 'Homo sapiens'), | |
10090 : ('mouse', 'Mus musculus'), | |
10116 : ('rat', 'Rattus norvegicus'), | |
7227 : ('fruitfly', 'Drosophila melanogaster'), | |
6239 : ('nematode', 'Caenorhabditis elegans'), | |
7955 : ('zebrafish', 'Danio rerio'), | |
3702 : ('thale-cress', 'Arabidopsis thaliana'), | |
8364 : ('frog', 'Xenopus tropicalis'), | |
9823 : ('pig', 'Sus scrofa'), | |
} | |
url = 'http://mygene.info/v3/query' | |
parameters = { | |
'q': "symbol:{}".format(gene), | |
'fields': 'taxid' | |
} | |
r = requests.get(url, params = parameters) | |
if not r.ok: | |
raise r.raise_for_status() | |
decoded = r.json() | |
#json_pprint(decoded) | |
tax = list(filter(bool, [tax_names.get(int(x['taxid']), False) for x in decoded['hits']])) | |
if tax: | |
ret = '\n'.join([' {} --> {}'.format(*x) for x in tax]) | |
return ret | |
return '' | |
def mygene(gene, task): | |
''' | |
By default is in hg38 | |
All fields | |
http://docs.mygene.info/en/latest/doc/data.html | |
http://mygene.info/v3/query?q=symbol:tpmt&fields=name&species=human | |
Test with: mygene('TPMT', 'get_names') | |
''' | |
def is_mygene_chromosome(x): | |
''' | |
Exclude things like: "chr": "CHR_HSCHR1_5_CTG32_1" | |
''' | |
return x.upper() in list(map(str, range(1,23))) + ['X', 'Y'] | |
url = 'http://mygene.info/v3/query' | |
if task == 'get_info': | |
fields = 'name,other_names,map_location,genomic_pos,refseq,pathway,summary,type_of_gene' | |
else: | |
raise Exception('Unknown task for mygene: {}'.format(task)) | |
parameters = { | |
'q': gene, | |
'species' : 'human', | |
'fields': fields, | |
#'hg38': 'true', | |
} | |
r = requests.get(url, params=parameters) | |
if not r.ok: | |
r.raise_for_status() | |
decoded = r.json() | |
#json_pprint(decoded) | |
first_hit = decoded['hits'][0] | |
if type(first_hit['genomic_pos']) is list: | |
chromosomes = {x['chr'] for x in first_hit['genomic_pos'] if is_mygene_chromosome(x['chr'])} | |
assert len(chromosomes) == 1 # Can it have more than in chromosomes? | |
chromosome = list(chromosomes)[0] | |
location = '\n'.join(["Ensemble Gene: {ensemblgene} Chromosome: {chr} Start: {start} End:{end}".format(**x) for x in first_hit['genomic_pos'] if x['chr'] == chromosome]) | |
else: | |
chromosome = first_hit['genomic_pos']['chr'] | |
location = "Ensemble Gene: {ensemblgene} Chromosome: {chr} Start: {start} End:{end}".format(**first_hit['genomic_pos']) | |
# Create refseq | |
refseq = '\n'.join([" {rna} --> {protein} ".format(**x) for x in first_hit['refseq']['translation']]) | |
# Pathway | |
pathway = '' | |
for pathway_db in ['kegg', 'pharmgkb', 'reactome', 'smpdb', 'wikipathways']: | |
if pathway_db in first_hit['pathway']: | |
#print (pathway_db, first_hit['pathway'][pathway_db]) | |
pathway += pathway_db + '\n' | |
if type(first_hit['pathway'][pathway_db]) is list: | |
pathway += '\n'.join([" {id} --> {name}".format(**x) for x in first_hit['pathway'][pathway_db]]) | |
else: | |
pathway += " {id} --> {name}".format(**first_hit['pathway'][pathway_db]) | |
ret = { | |
'name' : first_hit['name'], | |
'other_names' : '\n'.join(map(lambda x: ' ' + x, first_hit['other_names'])), | |
'map_location': first_hit['map_location'], | |
'location' : location, | |
'refseq' : refseq, | |
'pathway': pathway, | |
'summary': first_hit['summary'], | |
'taxonomy': mygene_taxonomy(gene), | |
'type_of_gene': first_hit['type_of_gene'] | |
} | |
return ret | |
############ end of mygene.info ################ | |
def report_errors(): | |
if not consts['errors']: | |
return | |
print ('Errors:') | |
for k,v in consts['errors'].items(): | |
print (consts['messages'][k],v) | |
if __name__ == '__main__': | |
''' | |
rs429358 . APOE | |
''' | |
configure_regexp() | |
parse_gff3(consts['gencode']) | |
index_transcripts() | |
index_genes() | |
parser = argparse.ArgumentParser(description='The best python project ever') | |
parser.add_argument('-q', '--question', help='Your question in english', required=False) | |
args = parser.parse_args() | |
question = args.question | |
if question: | |
answer(question) | |
report_errors() | |
#test_isoform_map() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment