Skip to content

Instantly share code, notes, and snippets.

@winni2k
Last active September 4, 2020 16:40
Show Gist options
  • Save winni2k/67b7c5f640c17501c84cd370f26d3714 to your computer and use it in GitHub Desktop.
Save winni2k/67b7c5f640c17501c84cd370f26d3714 to your computer and use it in GitHub Desktop.
Run bcftools call in parallel across regions in a bed file
channels:
- conda-forge
- bioconda
dependencies:
- python>=3.7
- samtools=1.10
- bcftools=1.10
- pyyaml
- click
# MIT License
#
# Copyright (c) 2020 Warren Winfried Kretzschmar
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio
import hashlib
import logging
import shutil
from dataclasses import dataclass
from pathlib import Path
from subprocess import run
import click
from functools import update_wrapper
import yaml
def coro(f):
f = asyncio.coroutine(f)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(f(*args, **kwargs))
return update_wrapper(wrapper, f)
# from https://asyncio.readthedocs.io/en/latest/subprocess.html
async def run_shell_command(cmd):
# Create subprocess
process = await asyncio.create_subprocess_shell(
cmd,
# stdout=asyncio.subprocess.PIPE,
# stderr=asyncio.subprocess.PIPE,
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
assert process.returncode == 0
return stdout, stderr
# Return stdout
# return stdout.decode().strip(), stderr.decode().strip()
@dataclass
class Region:
bed_file: Path
chrom: str
start: int
end: int
@classmethod
def from_bed_file(cls, bed_file):
bed_file = Path(bed_file)
assert bed_file.exists()
with open(bed_file) as fh:
bed_lines = fh.readlines()
fields = bed_lines[0].split("\t")
chrom = fields[0]
start = int(fields[1]) + 1
end = int(bed_lines[-1].split("\t")[2])
assert all(l.startswith(f"{chrom}\t") for l in bed_lines)
return cls(bed_file, chrom, start, end)
# from https://www.pythoncentral.io/hashing-files-with-python/
def get_hash_for_file(file):
BLOCKSIZE = 65536
hasher = hashlib.sha1()
with open(file, "rb") as afile:
buf = afile.read(BLOCKSIZE)
while len(buf) > 0:
hasher.update(buf)
buf = afile.read(BLOCKSIZE)
return hasher.hexdigest()
def check_invariants(
wd, bam, bed_file, reference, n_bed_split, call_args, mpileup_args
):
"""Returns False if invariants do not match"""
new_invariants = {
"n_bed_splits": n_bed_split,
"bam_sha256": get_hash_for_file(bam),
"bed_sha256": get_hash_for_file(bed_file),
"ref_sha256": get_hash_for_file(reference),
"call_args": call_args,
"mpileup_args": mpileup_args,
}
invariants_yaml = wd / "invariants.yml"
current_invariants = None
if invariants_yaml.exists():
with open(invariants_yaml) as fh:
current_invariants = yaml.safe_load(fh)
with open(str(invariants_yaml), "w") as fh:
yaml_str = yaml.dump(new_invariants)
fh.write(yaml_str)
if not current_invariants or current_invariants != new_invariants:
return False
return True
def merge_variants(output, output_format, *vcfs):
logging.info(f"Merging calls and outputting to {output}")
run(
f"bcftools concat -O {output_format} -o {output}".split() + list(vcfs),
check=True,
)
@click.command()
@click.argument("bam", type=click.Path(exists=True))
@click.argument("bed_file", type=click.Path(exists=True))
@click.argument("reference", type=click.Path(exists=True))
@click.option("--jobs", "-j", help="Number of parallel jobs", type=int, default=1)
@click.option(
"--work-dir",
"-w",
help="Work directory in which to create intermediate files",
type=click.Path(file_okay=False),
required=False,
)
@click.option("--output", "-o", help="Output file", type=click.Path(writable=True))
@click.option(
"--output-format",
"-O",
help="Output format as defined by bcftools view -O",
default="b",
type=click.Choice("b|u|z|v".split("|")),
)
@click.option("--n-bed-split", help="Number of splits of BED file", default=500)
@click.option("--call-args", help="Extra arguments to bcftools call", default="-vm")
@click.option("--mpileup-args", help="Extra arguments to bcftools mpileup", default="")
@coro
async def main(
bam,
bed_file,
reference,
jobs,
work_dir,
output,
output_format,
n_bed_split,
call_args,
mpileup_args,
):
click.echo("Runing bcftools call in parallel ✨")
logging.basicConfig(level=logging.INFO)
max_depth = 50000
wd = get_wd(output, work_dir)
logging.info("Checking invariants...")
rebuild = not check_invariants(
wd, bam, bed_file, reference, n_bed_split, call_args, mpileup_args
)
if rebuild:
logging.info("Invariants do not match. Rebuilding from scratch...")
shutil.rmtree(wd)
wd.mkdir()
else:
logging.info("Invariants match. Continuing where we left off..")
split_bed_files = create_split_beds(bed_file, n_bed_split, wd)
vcfs = await call_variants(
bam, jobs, max_depth, reference, split_bed_files, wd, call_args, mpileup_args
)
merge_variants(output, output_format, *vcfs)
async def call_variants(
bam, jobs, max_depth, reference, split_bed_files, wd, call_args, mpileup_args
):
vcf_dir = wd / "region_vcfs"
vcf_dir.mkdir(exist_ok=True)
out_vcfs = []
vcf_tasks = []
call_sem = asyncio.Semaphore(jobs)
for idx, bed in enumerate(split_bed_files):
r = Region.from_bed_file(bed)
out_vcf = vcf_dir / f"{idx:0>3d}.vcf"
out_vcfs.append(out_vcf)
if not out_vcf.exists():
vcf_tasks.append(
asyncio.create_task(
call_variants_in_region(
bam=bam,
max_depth=max_depth,
r=r,
reference=reference,
out_vcf=out_vcf,
semaphore=call_sem,
call_args=call_args,
mpileup_args=mpileup_args,
)
)
)
await asyncio.gather(*vcf_tasks)
return out_vcfs
async def call_variants_in_region(
bam, max_depth, out_vcf, r, reference, semaphore, mpileup_args, call_args
):
tmp_out = Path(f"{out_vcf}.tmp")
async with semaphore:
call_cmd = (
f"samtools view -u {bam} {r.chrom}:{r.start}-{r.end} "
f"| bcftools mpileup {mpileup_args} -T {r.bed_file} -d{max_depth} -Ou -f {reference} - "
f"| bcftools call {call_args} -O b -o {tmp_out}"
)
logging.info(call_cmd)
await run_shell_command(call_cmd)
shutil.move(tmp_out, out_vcf)
def create_split_beds(bed_file, n_bed_split, wd):
wd_bed = wd / "input.bed"
split_dir = wd / "bed_split"
shutil.copy(bed_file, wd_bed)
split_dir.mkdir(exist_ok=True)
split_bed_prefix = split_dir / "input.bed."
suffix_length = len(str(n_bed_split))
assert suffix_length < 4
run(
f"split -d -a{suffix_length} -n l/{n_bed_split} {wd_bed} {split_bed_prefix} ",
shell=True,
)
return [f"{split_bed_prefix}{i:0>3d}" for i in range(n_bed_split)]
def get_wd(output, work_dir):
if not work_dir:
work_dir = output + ".wd"
wd = Path(work_dir)
wd.mkdir(exist_ok=True)
return wd
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment