Last active
November 14, 2021 06:07
-
-
Save TTTPOB/1a8a960474a6a784f2da215b03ab3cc9 to your computer and use it in GitHub Desktop.
download fastq from accession number and description from ena using ascp, fast and support md5 validation and retry when failed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# from fastq_downloader.helper import construct_gsm_dict, download_and_verify, rename_files_by_run, merge_files | |
from fastq_downloader.helper.construct_gsm_dict import infotsv_to_dict, parse_acc_type | |
from fastq_downloader.helper.download_and_verify import ( | |
download_and_verify__, | |
check_file, | |
) | |
from fastq_downloader.helper.rename_files_by_run import ( | |
write_filename_to_dict, | |
actual_rename, | |
) | |
from fastq_downloader.helper.merge_files import merge_files | |
from fastq_downloader.helper.accession_to_links import gsm2link_md5, srx2link_md5 | |
from pathlib import Path | |
import json | |
from glob import glob | |
import os | |
import subprocess | |
import re | |
import shutil | |
# please invoke this script with --config set | |
# infotsv, a tsv file specified accession number and description | |
# output_dir, a directory to store the downloaded files | |
infotsv = config["infotsv"] | |
output_dir = config["output_dir"] | |
refresh_acc = config.get("refresh_acc", False) | |
retry = config.get("retry", 3) | |
tmp_dir = Path(output_dir, ".tmp") | |
tmp_dir = str(tmp_dir) | |
ascp_bin = subprocess.check_output("which ascp", shell=True).decode("utf-8").strip() | |
ascp_privkey = Path(ascp_bin).parent.parent / "etc" / "asperaweb_id_dsa.openssh" | |
if not ascp_privkey.exists(): | |
raise Exception("ascp private key not found: {}".format(ascp_privkey)) | |
infodict = infotsv_to_dict(infotsv) | |
infodict = parse_acc_type(infodict) | |
acc_list = list(infodict.keys()) | |
if (not Path(tmp_dir).exists()) or refresh_acc: | |
Path(tmp_dir).mkdir(exist_ok=True, parents=True) | |
for acc, info in infodict.items(): | |
Path(tmp_dir, "acc_json").mkdir(exist_ok=True, parents=True) | |
with open(Path(tmp_dir, "acc_json", f"{acc}.json"), "w") as f: | |
json.dump(info, f) | |
rule all: | |
input: | |
expand(tmp_dir + "/acc_json/{acc}.json", acc=acc_list), | |
expand(tmp_dir + "/acc_json_with_ascp/{acc}.json", acc=acc_list), | |
expand(tmp_dir + "/acc_json_with_rename/{acc}.json", acc=acc_list), | |
expand(output_dir + "/merged_fastq/{acc}/", acc=acc_list), | |
rule get_dict_with_ascp: | |
input: | |
tmp_dir + "/acc_json/{acc}.json", | |
output: | |
tmp_dir + "/acc_json_with_ascp/{acc}.json", | |
run: | |
with open(input[0]) as f: | |
info = json.load(f) | |
if info["type"] == "srx": | |
info["ascp"] = srx2link_md5(wildcards.acc) | |
elif info["type"] == "gsm": | |
info["ascp"] = gsm2link_md5(wildcards.acc) | |
else: | |
ValueError( | |
"accession type is not gsm or srx (please note the letter should be lower case)" | |
) | |
with open(output[0], "w") as f: | |
json.dump(info, f) | |
rule get_ascp_filename: | |
input: | |
rules.get_dict_with_ascp.output[0], | |
output: | |
tmp_dir + "/acc_json_with_rename/{acc}.json", | |
run: | |
with open(input[0]) as f: | |
info = json.load(f) | |
info = write_filename_to_dict(info) | |
with open(output[0], "w") as f: | |
json.dump(info, f) | |
checkpoint split_ascp_dict: | |
input: | |
rules.get_ascp_filename.output[0], | |
output: | |
directory(tmp_dir + "/ascp_split/{acc}"), | |
run: | |
info = json.load(open(input[0])) | |
for link, ascpinfo in info["ascp"].items(): | |
ascpinfo["link"] = link | |
orig_name = ascpinfo["orig_name"] | |
if "_1" in orig_name: | |
ascpinfo["mate"] = "R1" | |
elif "_2" in orig_name: | |
ascpinfo["mate"] = "R2" | |
elif "_" not in orig_name: | |
ascpinfo["mate"] = "R_" | |
else: | |
ValueError("mate is not R1 or R2, and not single end read") | |
Path(tmp_dir + f"/ascp_split/{wildcards.acc}/{ascpinfo['mate']}").mkdir( | |
exist_ok=True, parents=True | |
) | |
with open(f"{output[0]}/{ascpinfo['mate']}/{orig_name}.json", "w") as f: | |
json.dump(ascpinfo, f) | |
rule download_for_r1: | |
input: | |
tmp_dir + "/ascp_split/{acc}/R1/{orig_name}.json", | |
output: | |
temp(tmp_dir + "/unmerged_fastq/{acc}/R1/{orig_name}"), | |
run: | |
for i in range(retry): | |
try: | |
with open(input[0]) as f: | |
info = json.load(f) | |
link = info["link"] | |
orig_name = info["orig_name"] | |
outdir = str(Path(output[0]).parent) | |
download_and_verify__(link, info, ascp_privkey, outdir) | |
passed = True | |
break | |
except Exception as e: | |
print(e) | |
print(f"retry {i+1}/{retry}") | |
continue | |
if passed: | |
print(f"{orig_name} downloaded successfully") | |
else: | |
raise Exception(f"{orig_name} failed to download after {retry} retries") | |
use rule download_for_r1 as download_for_r2 with: | |
input: | |
tmp_dir + "/ascp_split/{acc}/R2/{orig_name}.json", | |
output: | |
temp(tmp_dir + "/unmerged_fastq/{acc}/R2/{orig_name}"), | |
use rule download_for_r1 as download_for_single with: | |
input: | |
tmp_dir + "/ascp_split/{acc}/R_/{orig_name}.json", | |
output: | |
temp(tmp_dir + "/unmerged_fastq/{acc}/R_/{orig_name}"), | |
def get_downloaded_files(wildcards): | |
ascp_checkpoint_dir = checkpoints.split_ascp_dict.get(**wildcards).output[0] | |
mate = glob(f"{ascp_checkpoint_dir}/*") | |
if len(mate) == 2: | |
print(f"{wildcards.acc} is paired end") | |
outputs = expand( | |
tmp_dir + "/unmerged_fastq/{acc}/R1/{orig_name}", | |
acc=wildcards.acc, | |
orig_name=glob_wildcards( | |
os.path.join( | |
tmp_dir, "ascp_split", wildcards.acc, "R1/{orig_name}.json" | |
) | |
).orig_name, | |
) + expand( | |
tmp_dir + "/unmerged_fastq/{acc}/R2/{orig_name}", | |
acc=wildcards.acc, | |
orig_name=glob_wildcards( | |
os.path.join( | |
tmp_dir, "ascp_split", wildcards.acc, "R2/{orig_name}.json" | |
) | |
).orig_name, | |
) | |
if len(mate) == 1: | |
print(f"{wildcards.acc} is single end") | |
outputs = expand( | |
tmp_dir + "/unmerged_fastq/{acc}/R_/{orig_name}", | |
acc=wildcards.acc, | |
orig_name=glob_wildcards( | |
os.path.join( | |
tmp_dir, "ascp_split", wildcards.acc, "R_/{orig_name}.json" | |
) | |
).orig_name, | |
) | |
return outputs | |
rule merge_files: | |
input: | |
[get_downloaded_files], | |
accdict=tmp_dir + "/acc_json/{acc}.json", | |
output: | |
directory(output_dir + "/merged_fastq/{acc}"), | |
run: | |
# tell if all of the files are of the same type | |
input_filename = input[:-1] | |
filename_list = [Path(i).name for i in input_filename] | |
if_underscore_list = ["_" in Path(i).name for i in filename_list] | |
if len(set(if_underscore_list)) != 1: | |
raise Exception( | |
f"accession {wildcards.acc} the files are not of the same read type\n" | |
+ f"filename_list: {filename_list},\n if_underscore_list: {if_underscore_list}" | |
) | |
# determinter if they are single end or paired end | |
with open(input.accdict) as f: | |
accdict = json.load(f) | |
desp = accdict["desp"] | |
r1_newname = desp + "_R1.fastq.gz" | |
r2_newname = desp + "_R2.fastq.gz" | |
r__newname = desp + ".fastq.gz" | |
def merge_(subdict): | |
with open(subdict["newname"], "wb") as wfd: | |
for f in subdict["oldnames"]: | |
with open(f, "rb") as fd: | |
shutil.copyfileobj(fd, wfd) | |
Path(output[0]).mkdir(exist_ok=True, parents=True) | |
if all(if_underscore_list): | |
r1_list = [i for i in input_filename if "_1" in Path(i).name] | |
r2_list = [i for i in input_filename if "_2" in Path(i).name] | |
r1_list.sort() | |
r2_list.sort() | |
print(input_filename) | |
mate_dict = { | |
"R1": {"oldnames": r1_list, "newname": r1_newname}, | |
"R2": {"oldnames": r2_list, "newname": r2_newname}, | |
} | |
print(mate_dict) | |
print(input) | |
if len(r1_list) != len(r2_list): | |
raise Exception( | |
f"accession {wildcards.acc} r1 and r2 file numbers are not equal" | |
) | |
# if only one run, only move, no need to merge | |
if len(r1_list) == 1: | |
for mate, subdict in mate_dict.items(): | |
shutil.move( | |
subdict["oldnames"][0], output[0] + "/" + subdict["newname"] | |
) | |
else: | |
for mate, info in mate_dict.items(): | |
merge_(info) | |
else: | |
r__list = [i for i in input_filename] | |
r__list.sort() | |
print(r__list) | |
print(input) | |
mate_dict = {"R_": {"oldnames": r__list, "newname": r__newname}} | |
if len(r__list) == 1: | |
shutil.move(r__list[0], output[0] + "/" + r__newname) | |
else: | |
for mate, info in mate_dict.items(): | |
merge_(info) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment