Skip to content

Instantly share code, notes, and snippets.

@pvanheus
Created December 9, 2018 21:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pvanheus/f151bc2f7f532d03bd0621bd8dcdbbc5 to your computer and use it in GitHub Desktop.
Save pvanheus/f151bc2f7f532d03bd0621bd8dcdbbc5 to your computer and use it in GitHub Desktop.
download genomes from Genbank, using Python aiohttp
#!/usr/bin/env python
import argparse
import asyncio
from pathlib import Path
import socket
import re
import pandas
from aiohttp import ClientSession, TCPConnector
def assembly_url(assembly_accession: str)-> str:
"""assembly_url - convert a Genbank accession into the URL where its Genbank assembly is stored
assembly_accession: str - Genbank accession of an assembled genome
Format of accession is GXXDDDDDDDDD.VERSION where DDD are groups of 3 numbers"""
assert re.match('^G.._\d+\.\d+$', assembly_accession) is not None, f"Assembly accession {assembly_accession} does not match expected format"
base_url = 'https://ftp.ncbi.nlm.nih.gov/genomes/all/'
assembly_path = assembly_accession[:3] + '/' + '/'.join(
[assembly_accession[n:n + 3] for n in range(4, len(assembly_accession) - 2, 3)])
return base_url + assembly_path
def get_genomes(patric_genomes_filename: str) -> str:
"""get_genomes - Read a PATRIC format genome CSV file and yield the Genbank genome accessions
patric_genomes_filename: str - filename for PATRIC format genome CSV file"""
genome_data = pandas.read_csv(patric_genomes_filename)
for genome_accession_str in genome_data[genome_data['Assembly Accession'].notnull()]['Assembly Accession']:
if "," in genome_accession_str:
accessions = genome_accession_str.split(',')
for accession in accessions:
yield accession.strip()
else:
yield genome_accession_str.strip()
async def get_genome_to_dir(q: asyncio.Queue, output_dir: str):
"""get_genome_to_dir - fetch accessions from queue and download Genbank genome
q: asyncio.Queue - queue to fetch genome accessions from (None implies work finished)
output_dir: directory to write genome FASTA files to"""
output_dir_path = Path(output_dir)
if not output_dir_path.is_dir():
raise IOError(f"{output_dir} does not refer to a writeable directory")
while True:
accession = await q.get()
if accession is None:
await q.put(None)
break
else:
url = assembly_url(accession)
async with ClientSession(connector=TCPConnector(family=socket.AF_INET)) as session:
async with session.get(url) as response:
if response.status == 200:
response_text = await response.text()
match = re.search(r'href="(G[^"]+)"', response_text)
if match is not None:
final_url = url + '/' + match.group(1) + match.group(1)[:-1] + "_genomic.fna.gz"
async with session.get(final_url) as response:
if response.status == 200:
output_path = output_dir_path / f'{accession}.fasta.gz'
with output_path.open('wb') as output_file:
output_file.write(await response.read())
print(output_path)
else:
await q.put(accession)
else:
await q.put(accession)
async def download_genomes(loop: asyncio.AbstractEventLoop,
patric_genomes_filename: str, output_dir: str, num_tasks: int):
"""download_genomes - download all Genbank genomes listed in a PATRIC genome info file"""
q = asyncio.Queue()
fetchers = [loop.create_task(get_genome_to_dir(q, output_dir)) for _ in range(num_tasks)]
for accession in get_genomes(patric_genomes_filename):
await q.put(accession)
await q.put(None)
await asyncio.wait(fetchers)
def main():
"""main - main function to handle command line arguments and call genome downloader"""
parser = argparse.ArgumentParser(description='Fetch PATRIC genomes from NCBI')
parser.add_argument('--num_tasks', type=int, default=4, help='Number of concurrent downloads to run')
parser.add_argument('genomes_filename', help='File with PATRIC genome info')
parser.add_argument('output_dir', help='Directory to write outputs to', default='.')
args = parser.parse_args()
loop = asyncio.get_event_loop()
loop.run_until_complete(download_genomes(loop, args.genomes_filename, args.output_dir, args.num_tasks))
loop.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment