Last active
September 4, 2020 16:40
-
-
Save winni2k/67b7c5f640c17501c84cd370f26d3714 to your computer and use it in GitHub Desktop.
Run bcftools call in parallel across regions in a bed file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
channels: | |
- conda-forge | |
- bioconda | |
dependencies: | |
- python>=3.7 | |
- samtools=1.10 | |
- bcftools=1.10 | |
- pyyaml | |
- click |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MIT License | |
# | |
# Copyright (c) 2020 Warren Winfried Kretzschmar | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import asyncio | |
import hashlib | |
import logging | |
import shutil | |
from dataclasses import dataclass | |
from pathlib import Path | |
from subprocess import run | |
import click | |
from functools import update_wrapper | |
import yaml | |
def coro(f): | |
f = asyncio.coroutine(f) | |
def wrapper(*args, **kwargs): | |
loop = asyncio.get_event_loop() | |
return loop.run_until_complete(f(*args, **kwargs)) | |
return update_wrapper(wrapper, f) | |
# from https://asyncio.readthedocs.io/en/latest/subprocess.html | |
async def run_shell_command(cmd): | |
# Create subprocess | |
process = await asyncio.create_subprocess_shell( | |
cmd, | |
# stdout=asyncio.subprocess.PIPE, | |
# stderr=asyncio.subprocess.PIPE, | |
) | |
# Wait for the subprocess to finish | |
stdout, stderr = await process.communicate() | |
assert process.returncode == 0 | |
return stdout, stderr | |
# Return stdout | |
# return stdout.decode().strip(), stderr.decode().strip() | |
@dataclass | |
class Region: | |
bed_file: Path | |
chrom: str | |
start: int | |
end: int | |
@classmethod | |
def from_bed_file(cls, bed_file): | |
bed_file = Path(bed_file) | |
assert bed_file.exists() | |
with open(bed_file) as fh: | |
bed_lines = fh.readlines() | |
fields = bed_lines[0].split("\t") | |
chrom = fields[0] | |
start = int(fields[1]) + 1 | |
end = int(bed_lines[-1].split("\t")[2]) | |
assert all(l.startswith(f"{chrom}\t") for l in bed_lines) | |
return cls(bed_file, chrom, start, end) | |
# from https://www.pythoncentral.io/hashing-files-with-python/ | |
def get_hash_for_file(file): | |
BLOCKSIZE = 65536 | |
hasher = hashlib.sha1() | |
with open(file, "rb") as afile: | |
buf = afile.read(BLOCKSIZE) | |
while len(buf) > 0: | |
hasher.update(buf) | |
buf = afile.read(BLOCKSIZE) | |
return hasher.hexdigest() | |
def check_invariants( | |
wd, bam, bed_file, reference, n_bed_split, call_args, mpileup_args | |
): | |
"""Returns False if invariants do not match""" | |
new_invariants = { | |
"n_bed_splits": n_bed_split, | |
"bam_sha256": get_hash_for_file(bam), | |
"bed_sha256": get_hash_for_file(bed_file), | |
"ref_sha256": get_hash_for_file(reference), | |
"call_args": call_args, | |
"mpileup_args": mpileup_args, | |
} | |
invariants_yaml = wd / "invariants.yml" | |
current_invariants = None | |
if invariants_yaml.exists(): | |
with open(invariants_yaml) as fh: | |
current_invariants = yaml.safe_load(fh) | |
with open(str(invariants_yaml), "w") as fh: | |
yaml_str = yaml.dump(new_invariants) | |
fh.write(yaml_str) | |
if not current_invariants or current_invariants != new_invariants: | |
return False | |
return True | |
def merge_variants(output, output_format, *vcfs): | |
logging.info(f"Merging calls and outputting to {output}") | |
run( | |
f"bcftools concat -O {output_format} -o {output}".split() + list(vcfs), | |
check=True, | |
) | |
@click.command() | |
@click.argument("bam", type=click.Path(exists=True)) | |
@click.argument("bed_file", type=click.Path(exists=True)) | |
@click.argument("reference", type=click.Path(exists=True)) | |
@click.option("--jobs", "-j", help="Number of parallel jobs", type=int, default=1) | |
@click.option( | |
"--work-dir", | |
"-w", | |
help="Work directory in which to create intermediate files", | |
type=click.Path(file_okay=False), | |
required=False, | |
) | |
@click.option("--output", "-o", help="Output file", type=click.Path(writable=True)) | |
@click.option( | |
"--output-format", | |
"-O", | |
help="Output format as defined by bcftools view -O", | |
default="b", | |
type=click.Choice("b|u|z|v".split("|")), | |
) | |
@click.option("--n-bed-split", help="Number of splits of BED file", default=500) | |
@click.option("--call-args", help="Extra arguments to bcftools call", default="-vm") | |
@click.option("--mpileup-args", help="Extra arguments to bcftools mpileup", default="") | |
@coro | |
async def main( | |
bam, | |
bed_file, | |
reference, | |
jobs, | |
work_dir, | |
output, | |
output_format, | |
n_bed_split, | |
call_args, | |
mpileup_args, | |
): | |
click.echo("Runing bcftools call in parallel ✨") | |
logging.basicConfig(level=logging.INFO) | |
max_depth = 50000 | |
wd = get_wd(output, work_dir) | |
logging.info("Checking invariants...") | |
rebuild = not check_invariants( | |
wd, bam, bed_file, reference, n_bed_split, call_args, mpileup_args | |
) | |
if rebuild: | |
logging.info("Invariants do not match. Rebuilding from scratch...") | |
shutil.rmtree(wd) | |
wd.mkdir() | |
else: | |
logging.info("Invariants match. Continuing where we left off..") | |
split_bed_files = create_split_beds(bed_file, n_bed_split, wd) | |
vcfs = await call_variants( | |
bam, jobs, max_depth, reference, split_bed_files, wd, call_args, mpileup_args | |
) | |
merge_variants(output, output_format, *vcfs) | |
async def call_variants( | |
bam, jobs, max_depth, reference, split_bed_files, wd, call_args, mpileup_args | |
): | |
vcf_dir = wd / "region_vcfs" | |
vcf_dir.mkdir(exist_ok=True) | |
out_vcfs = [] | |
vcf_tasks = [] | |
call_sem = asyncio.Semaphore(jobs) | |
for idx, bed in enumerate(split_bed_files): | |
r = Region.from_bed_file(bed) | |
out_vcf = vcf_dir / f"{idx:0>3d}.vcf" | |
out_vcfs.append(out_vcf) | |
if not out_vcf.exists(): | |
vcf_tasks.append( | |
asyncio.create_task( | |
call_variants_in_region( | |
bam=bam, | |
max_depth=max_depth, | |
r=r, | |
reference=reference, | |
out_vcf=out_vcf, | |
semaphore=call_sem, | |
call_args=call_args, | |
mpileup_args=mpileup_args, | |
) | |
) | |
) | |
await asyncio.gather(*vcf_tasks) | |
return out_vcfs | |
async def call_variants_in_region( | |
bam, max_depth, out_vcf, r, reference, semaphore, mpileup_args, call_args | |
): | |
tmp_out = Path(f"{out_vcf}.tmp") | |
async with semaphore: | |
call_cmd = ( | |
f"samtools view -u {bam} {r.chrom}:{r.start}-{r.end} " | |
f"| bcftools mpileup {mpileup_args} -T {r.bed_file} -d{max_depth} -Ou -f {reference} - " | |
f"| bcftools call {call_args} -O b -o {tmp_out}" | |
) | |
logging.info(call_cmd) | |
await run_shell_command(call_cmd) | |
shutil.move(tmp_out, out_vcf) | |
def create_split_beds(bed_file, n_bed_split, wd): | |
wd_bed = wd / "input.bed" | |
split_dir = wd / "bed_split" | |
shutil.copy(bed_file, wd_bed) | |
split_dir.mkdir(exist_ok=True) | |
split_bed_prefix = split_dir / "input.bed." | |
suffix_length = len(str(n_bed_split)) | |
assert suffix_length < 4 | |
run( | |
f"split -d -a{suffix_length} -n l/{n_bed_split} {wd_bed} {split_bed_prefix} ", | |
shell=True, | |
) | |
return [f"{split_bed_prefix}{i:0>3d}" for i in range(n_bed_split)] | |
def get_wd(output, work_dir): | |
if not work_dir: | |
work_dir = output + ".wd" | |
wd = Path(work_dir) | |
wd.mkdir(exist_ok=True) | |
return wd | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment