Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Identify Ensembl release versions of Ensembl transcript/gene/protein IDs. Details at https://blog.liang2.tw/posts/2020/06/identify-ensembl-release-ver/
"""Find the possible Ensembl releases of the given IDs.
The script uses Ensembl Tark APIs to subset the possible Ensembl releases
that cover all the given Ensembl IDs. Usually it can pinpoint the right release
using less than 30 IDs. Feeding more IDs may exceed the API call rate limit.
Known issues:
* The API doesn't handle ENSGR (chrY PAR genes)
"""
import argparse
import asyncio
import logging
import re
import aiohttp
logger = logging.getLogger(__name__)
async def get_tx_alive_releases(tx_id_version, session):
resp = await session.get(
'http://dev-tark.ensembl.org/api/transcript/stable_id_with_version/',
params={
'stable_id_with_version': tx_id_version,
'expand': 'transcript_release_set'
}
)
j = await resp.json()
if j['next'] is not None:
logger.warning(f'{tx_id_version} has paged results')
alive_releases = set()
for result in j['results']:
for release in result['transcript_release_set']:
if release['source'] != 'Ensembl':
continue
release_ver = int(release['shortname'])
alive_releases.add(release_ver)
return alive_releases
async def get_gene_alive_releases(gene_id_version: str, session: aiohttp.ClientSession):
m = re.match(r'^(?P<stable_id>ENSG\d+)\.(?P<stable_id_version>\d+)$', gene_id_version)
if m is None:
logger.error(f'Skipped invalid gene ID: {gene_id_version}')
return None
resp = await session.get(
'http://dev-tark.ensembl.org/api/gene/',
params={
**m.groupdict(),
'expand': 'gene_release_set'
}
)
j = await resp.json()
if j['next'] is not None:
logger.warning(f'{gene_id_version} has paged results')
alive_releases = set()
for result in j['results']:
for release in result['gene_release_set']:
if release['source'] != 'Ensembl':
continue
release_ver = int(release['shortname'])
alive_releases.add(release_ver)
return alive_releases
async def get_protein_alive_releases(protein_id_version: str, session: aiohttp.ClientSession):
m = re.match(r'^(?P<stable_id>ENSP\d+)\.(?P<stable_id_version>\d+)$', protein_id_version)
if m is None:
logger.error(f'Skipped invalid gene ID: {protein_id_version}')
return None
resp = await session.get(
'http://dev-tark.ensembl.org/api/translation/',
params={
**m.groupdict(),
'expand': 'translation_release_set'
}
)
j = await resp.json()
if j['next'] is not None:
logger.warning(f'{protein_id_version} has paged results')
alive_releases = set()
for result in j['results']:
for release in result['translation_release_set']:
if release['source'] != 'Ensembl':
continue
release_ver = int(release['shortname'])
alive_releases.add(release_ver)
return alive_releases
async def find_alive_release(ens_id, session):
"""Dispatch ID query based on its type."""
if ens_id.startswith('ENST'):
releases = await get_tx_alive_releases(ens_id, session)
elif ens_id.startswith('ENSG'):
releases = await get_gene_alive_releases(ens_id, session)
elif ens_id.startswith('ENSP'):
releases = await get_protein_alive_releases(ens_id, session)
else:
releases = None
return ens_id, releases
async def get_supported_releases(session):
resp = await session.get('http://dev-tark.ensembl.org/api/release/nopagination')
j = await resp.json()
all_releases = sorted([
int(release['shortname']) for release in j
if release['source'] == 'Ensembl'
])
return all_releases
def compress_release_repr(releases):
"""Compress the representation of releases."""
compressed_repr = []
start = releases[0]
distance = 0
for res in releases:
if res != start + distance:
prev = start + distance - 1
compressed_repr.append(f'{start}{prev}')
start = res
distance = 1
else:
distance += 1
if res == releases[-1]:
compressed_repr.append(f'{start}{res}')
return compressed_repr
async def main(ensembl_ids):
logger.info(f'Querying for {len(ensembl_ids)} Ensembl IDs')
possible_releases = None
# Limit concurrent API calls
conn = aiohttp.TCPConnector(limit_per_host=5)
async with aiohttp.ClientSession(connector=conn) as session:
# Get the range of supported releases
supported_releases = await get_supported_releases(session)
supported_releases_repr = ', '.join(compress_release_repr(supported_releases))
logger.info(
f'Only Ensembl releases {supported_releases_repr} are supported '
f'by Ensembl Tark. IDs outside the range may not be identified.'
)
# Query the IDs
tasks = [find_alive_release(ens_id, session) for ens_id in ensembl_ids]
for res in asyncio.as_completed(tasks):
ens_id, releases = await res
if releases is None or not releases:
logger.error(f'{ens_id} is not in any possible release')
continue
if possible_releases is None:
possible_releases = releases
else:
possible_releases.intersection_update(releases)
alive_release_str = ', '.join(compress_release_repr(sorted(releases)))
print(f'{ens_id} in Ensembl releases {alive_release_str}')
possible_releases = [str(res) for res in sorted(possible_releases)]
print(f'Possible Ensembl releases are: '
f'{", ".join(possible_releases)}')
def setup_cli():
# Setup console logging
console = logging.StreamHandler()
all_loggers = logging.getLogger()
all_loggers.setLevel(logging.INFO)
all_loggers.addHandler(console)
log_fmt = '[%(asctime)s][%(levelname)-7s] %(message)s'
log_formatter = logging.Formatter(log_fmt, '%Y-%m-%d %H:%M:%S')
console.setFormatter(log_formatter)
# Setup CLI parser
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('ids_list', help="Path to list of Ensembl IDs")
return parser
if __name__ == "__main__":
parser = setup_cli()
args = parser.parse_args()
with open(args.ids_list) as f:
ensembl_ids = [line.strip() for line in f.read().splitlines() if line]
asyncio.run(main(ensembl_ids))
"""Find the possible Ensembl releases of the given IDs.
The script uses Ensembl Tark APIs to subset the possible Ensembl releases
that cover all the given Ensembl IDs. Usually it can pinpoint the right release
using less than 30 IDs. Feeding more IDs may exceed the API call rate limit.
Known issues:
* The API doesn't handle ENSGR (chrY PAR genes)
"""
import argparse
import asyncio
from collections import Counter, defaultdict
import logging
import aiohttp
logger = logging.getLogger(__name__)
async def get_alive_releases(protein_id_version: str, session: aiohttp.ClientSession):
resp = await session.get(
'http://dev-tark.ensembl.org/api/transcript/search/',
params={
'identifier_field': protein_id_version,
'expand': 'transcript_release_set'
}
)
j = await resp.json()
alive_releases = set()
for result in j:
if result['assembly'] != 'GRCh38':
continue
# tx_id = f"{result['stable_id']}.{result['stable_id_version']}"
for release in result['transcript_release_set']:
if release['source'] != 'RefSeq':
continue
release_ver = release['shortname']
alive_releases.add(release_ver)
return alive_releases
async def find_alive_release(refseq_id, session):
"""Dispatch ID query based on its type."""
releases = await get_alive_releases(refseq_id, session)
return refseq_id, releases
async def get_supported_releases(session):
resp = await session.get('http://dev-tark.ensembl.org/api/release/nopagination')
j = await resp.json()
all_releases = sorted([
release['shortname'] for release in j
if release['source'] == 'RefSeq'
])
return all_releases
async def main(refseq_ids):
logger.info(f'Querying for {len(refseq_ids)} RefSeq IDs')
possible_releases = None
id_release_map = defaultdict(set)
# Limit concurrent API calls
conn = aiohttp.TCPConnector(limit_per_host=5)
async with aiohttp.ClientSession(connector=conn) as session:
# Get the range of supported releases
supported_releases = await get_supported_releases(session)
logger.info(
f'Only these RefSeq releases are supported by Ensembl Tark:'
f' {", ".join(supported_releases)}.'
f'IDs outside the range may not be identified'
)
# Query the IDs
tasks = [find_alive_release(refseq_id, session) for refseq_id in refseq_ids]
for res in asyncio.as_completed(tasks):
refseq_id, releases = await res
if releases is None or not releases:
logger.error(f'{refseq_id} is not in any possible release')
continue
if possible_releases is None:
possible_releases = Counter(releases)
else:
possible_releases.update(releases)
alive_release_str = ", ".join(sorted(releases))
print(f'{refseq_id} in RefSeq releases {alive_release_str}')
for r in releases:
id_release_map[r].add(refseq_id)
print('Possible RefSeq releases are: ')
all_ids = set(refseq_ids)
for release, count in possible_releases.most_common(10):
missing_ids = all_ids - id_release_map[release]
print(f' {release}: {count:3d}/{len(refseq_ids):3d}. '
f'Missing {", ".join(missing_ids)}')
def setup_cli():
# Setup console logging
console = logging.StreamHandler()
all_loggers = logging.getLogger()
all_loggers.setLevel(logging.INFO)
all_loggers.addHandler(console)
log_fmt = '[%(asctime)s][%(levelname)-7s] %(message)s'
log_formatter = logging.Formatter(log_fmt, '%Y-%m-%d %H:%M:%S')
console.setFormatter(log_formatter)
# Setup CLI parser
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('ids_list', help="Path to list of RefSeq IDs")
return parser
if __name__ == "__main__":
parser = setup_cli()
args = parser.parse_args()
with open(args.ids_list) as f:
refseq_ids = [line.strip() for line in f.read().splitlines() if line]
asyncio.run(main(refseq_ids))
ENST00000426449.4
ENST00000434817.4
ENST00000435221.5
ENST00000451778.4
ENSP00000222254.6
ENSP00000477864.1
ENSP00000484714.1
ENSG00000170266.14
ENSG00000238009.5
ENSG00000230415.1
ENSG00000236335.1
ENSG00000173705.7
NP_001005484.1
NP_001005221.2
NP_689699.2
NP_056473.2
NP_938073.1
NP_001153656.1
NP_115505.2
NP_001278295.1
NP_001278296.1
NP_066993.1
NP_001135939.1
NP_005092.1
NP_001292204.1
NP_940978.2
NP_001192181.1
NP_001317235.1
NP_001350454.1
NP_060361.4
NP_001123517.1
NP_694986.2
NP_004186.1
NP_683699.1
NP_683700.1
NP_003318.1
NP_057260.2
NP_057631.1
NP_542172.2
NP_001014980.1
NP_477515.2
NP_919296.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.