-
-
Save callahantiff/a28fb3160782f42f104e9ec41553af0d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# import needed libraries | |
import glob | |
import json | |
import os | |
import pickle | |
import random | |
import requests | |
import time | |
from tqdm import tqdm # type: ignore | |
from typing import Any, Dict, IO, List, Set, Tuple, Union | |
def gets_json_results_from_api_call(url: str, api_key: str) -> Dict: | |
"""Function makes API requests and returns results as a json file. API documentation can be found here: | |
http://data.bioontology.org/documentation. If a 500 HTTP server-side code is return from "status_code" then the | |
algorithm pauses for 90-120 seconds before trying the request again. By default, the process sleeps for 5-20 | |
seconds after each API call. | |
Args: | |
url: A string containing a URL to be run against an API. | |
api_key: A string containing an API key. | |
Return: | |
A json-formatted file containing API results. | |
Raises: | |
An exception is raised if a 500 HTTP server-side code is raised. | |
""" | |
response = requests.get(url, headers={'Authorization': 'apikey token=' + api_key}) | |
time.sleep(random.randint(10, 20)) # ease rate limiting by sleeping for random intervals | |
if response.status_code == 500: | |
time.sleep(random.randint(90, 120)) # ease rate limiting by sleeping for random intervals | |
response = requests.get(url, headers={'Authorization': 'apikey token=' + api_key}) | |
return json.loads(response.text) | |
def writes_data_to_file(file_out: str, results: Set[Tuple[str, Any]]) -> None: | |
"""Function iterates over set of tuples and writes data to text file locally. | |
Args: | |
file_out: A filepath to write data to. | |
results: A set of tuples, where each tuple represents a mapping between two identifiers. | |
Returns: | |
None. | |
""" | |
print('Writing results to {location}'.format(location=file_out)) | |
with open(file_out, 'w') as outfile: | |
for res in results: | |
outfile.write(res[0] + '\t' + res[1] + '\n') | |
outfile.close() | |
return None | |
def processes_api_page_results(content: Dict, source2: str) -> Set: | |
"""Takes a page of API results and processes them to capture and only those mappings that exist between source1 | |
and source2. The method returns the results as a set of tuples. Between each batch the process sleeps for 90-120 | |
seconds to ease the burden on the API. | |
Args: | |
content: A dictionary of API page results. | |
source2: A string naming an ontology. | |
Returns: | |
unique_edges: A set of tuples, where each tuple is an edge representing a mapping between source1 and source2. | |
""" | |
unique_edges = set() | |
if 'collection' not in content.keys(): | |
raise KeyError('Something went wrong: {}'.format(content['error'])) | |
else: | |
for result in content['collection']: | |
if source2 in result['classes'][1]['links']['ontology']: | |
source1_class, source2_class = result['classes'][0]['@id'], result['classes'][1]['@id'] | |
if '.owl' not in source1_class and '.owl' not in source2_class: | |
unique_edges.add((source1_class, source2_class)) | |
return unique_edges | |
def extracts_mapping_data(api_key: str, source1: str, source2: str, file_out: str) -> None: | |
"""Function uses the BioPortal API to retrieve mappings between two sources. The function batch processes the | |
results in chunks of 500, writes the data to a temporary directory and then once all batches have been processed, | |
the data is concatenated into a single file. | |
Args: | |
api_key: A string containing a user BiPortal API key. | |
source1: A string naming a source ontology that you want to map from. | |
source2: A string naming an ontology that you want to map identifiers from source1 to. | |
file_out: A filepath to write data to. | |
Returns: | |
None. | |
""" | |
print('=' * 50 + '\nRetrieving - {src1} - {src2} Mappings\n'.format(src1=source1, src2=source2) + '=' * 50) | |
# get the available resources for mappings to source | |
ont_source = 'http://data.bioontology.org/ontologies/{source}/mappings/'.format(source=source1) | |
api_results = gets_json_results_from_api_call(ont_source, api_key) | |
print('Processing {} Pages of Results'.format(api_results['pageCount'])) | |
# create temp progress directory to store pages | |
temp_progress_storage = '/'.join(file_out.split('/')[:-1]) + '/processed_data' | |
os.mkdir(temp_progress_storage) | |
# batch process api result pages | |
total_pages = list(range(1, int(api_results['pageCount']) + 1)) | |
n = 500 if len(total_pages) > 5000 else 100 | |
batches = [total_pages[i:i + n] for i in range(0, len(total_pages), n)] | |
for batch in range(0, len(batches)): | |
print('\nProcessing batch {} of {}'.format(batch + 1, len(batches) + 1)) | |
page_results = set() | |
for page in tqdm(batches[batch]): | |
content = gets_json_results_from_api_call(ont_source + '?page={page}'.format(page=page), api_key) | |
pickle.dump(content, open(temp_progress_storage + '/page_{}.pkl'.format(str(page)), 'wb')) # temp store pg | |
page_results |= processes_api_page_results(content, source2) | |
writes_data_to_file(file_out + '_{batch_num}'.format(batch_num=batch + 1) + '.txt', page_results) | |
time.sleep(random.randint(30, 60)) # ease rate limiting by sleeping for random intervals | |
return None | |
def main() -> None: | |
api_key = input('Please provide your BioPortal API Key: ') | |
source1 = input('Enter ontology source 1: ').upper() | |
source2 = input('Enter ontology source 2: ').upper() | |
# create temp directory to store batches | |
temp_directory = './resources/processed_data/temp' | |
try: | |
print('Creating a temporary directory to write API results to: {}'.format(temp_directory)) | |
os.mkdir(temp_directory) | |
except FileExistsError: | |
check_input = input('There is already a temp directory, should I overwrite it?: Yes/No') | |
if check_input.lower() == 'yes': | |
os.remove(temp_directory) | |
os.mkdir(temp_directory) | |
else: | |
new_directory = input('Please provide a name for directory to write data to: ') | |
os.mkdir('./resources/processed_data/' + new_directory) | |
# run program to map identifiers between source1 and source2 | |
api_write_location = '/{source1}_{source2}_MAP'.format(source1=source1, source2=source2) | |
extracts_mapping_data(api_key, source1, source2, temp_directory + api_write_location) | |
# concatenate api data stored in temp directory into single file in the temp directory | |
with open(temp_directory + '/{}_{}_MAP.txt'.format(source1.upper(), source2.upper()), 'w') as out: | |
for filename in tqdm(glob.glob(temp_directory + '/*.txt')): | |
for row in list(filter(None, open(filename, 'r').read().split('\n'))): | |
source1_class = '_'.join(row.split('\t')[1].split('/')[-2:]) | |
source2_class = row.split('\t')[0].split('/')[-1] | |
out.write(source1_class + '\t' + source2_class + '\n') | |
out.close() | |
# delete temp progress storage | |
os.remove(temp_directory + '/processed_data') | |
if __name__ == '__main__': | |
main() |
Dear Tiffany,
thanks a lot for your script!
I am trying to get mappings from the bioportal using it, yet I encounter the following error:
_
ncbo_rest_api_orig.py in extracts_mapping_data(api_key, source1, source2, file_out)
118 content = gets_json_results_from_api_call(ont_source + '?page={page}'.format(page=page), api_key)
119 pickle.dump(content, open(temp_progress_storage + '/page_{}.pkl'.format(str(page)), 'wb')) # temp store pg
--> 120 page_results |= processes_api_page_results(content)
ncbo_rest_api_orig.py in processes_api_page_results(content)
75 else:
76 for result in content['collection']:
---> 77 if source2 in result['classes'][1]['links']['ontology']:
78 source1_class, source2_class = result['classes'][0]['@id'], result['classes'][1]['@id']
79 if '.owl' not in source1_class and '.owl' not in source2_class:
NameError: name 'source2' is not defined_
I tried to fix it by passing param source2 from within method extracts_mapping_data() through to
processes_api_page_results(content, source2)
-> then the code runs, but I get a mapping text file such as (mapping UO and AGRO for testing):
obo_UO_0010014 UO_0010014
obo_UO_0010050 UO_0010050
obo_UO_0000039 UO_0000039
obo_UO_1000036 UO_1000036
obo_UO_0010034 UO_0010034
...
I am not a very proficient python coder, my background is molecular biology, so I was wondering whether you could have a quick look?
Thanks a lot!
Hi @hars-j - so sorry for the delay in getting back to you. Are you still stuck on this? I am happy to help troubleshoot. If so, would you mind providing the input statement/code that you are trying to run so I can see the arguments that you are passing?
In the function processes_api_page_results
line 66, update the parameters to the following:
def processes_api_page_results(content: Dict, source2: str) -> Set:
then in the extracts_mapping_data
function update line 129 from:
page_results |= processes_api_page_results(content)
to:
page_results |= processes_api_page_results(content, source2)
That should fix it.
Thanks so much @ejmurray! I agree that is exactly what was needed and per your comment, I have updated the script. Thank you again!
Output:
<<temp_directory>>/CHEBI_MESH_MAP.txt
Sample Output: