Created
January 18, 2019 14:53
-
-
Save ruchim/a07d7490ea49bdc35dc0dca41889aa43 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Copyright Broad Institute, 2018 | |
## | |
## This WDL pipeline implements data pre-processing according to the GATK Best Practices | |
## (June 2016). | |
## | |
## Requirements/expectations : | |
## - Pair-end sequencing data in unmapped BAM (uBAM) format | |
## - One or more read groups, one per uBAM file, all belonging to a single sample (SM) | |
## - Input uBAM files must additionally comply with the following requirements: | |
## - - filenames all have the same suffix (we use ".unmapped.bam") | |
## - - files must pass validation by ValidateSamFile | |
## - - reads are provided in query-sorted order | |
## - - all reads must have an RG tag | |
## | |
## Output : | |
## - A clean BAM file and its index, suitable for variant discovery analyses. | |
## | |
## Software version requirements (see recommended dockers in inputs JSON) | |
## - GATK 4 or later | |
## - Picard (see gotc docker) | |
## - Samtools (see gotc docker) | |
## - Python 2.7 | |
## | |
## Cromwell version support | |
## - Successfully tested on v32 | |
## - Does not work on versions < v23 due to output syntax | |
## | |
## Runtime parameters are optimized for Broad's Google Cloud Platform implementation. | |
## | |
## LICENSING : | |
## This script is released under the WDL source code license (BSD-3) (see LICENSE in | |
## https://github.com/broadinstitute/wdl). Note however that the programs it calls may | |
## be subject to different licenses. Users are responsible for checking that they are | |
## authorized to run all programs before running this script. Please see the dockers | |
## for detailed licensing information pertaining to the included programs. | |
# WORKFLOW DEFINITION | |
workflow PreProcessingForVariantDiscovery_GATK4 { | |
String sample_name | |
String ref_name | |
File flowcell_unmapped_bams_list | |
String unmapped_bam_suffix | |
File ref_fasta | |
File ref_fasta_index | |
File ref_dict | |
String bwa_commandline | |
Int compression_level | |
File dbSNP_vcf | |
File dbSNP_vcf_index | |
Array[File] known_indels_sites_VCFs | |
Array[File] known_indels_sites_indices | |
String gotc_docker | |
String gatk_docker | |
String python_docker | |
String gotc_path | |
String gatk_path | |
Int flowcell_small_disk | |
Int flowcell_medium_disk | |
Int agg_small_disk | |
Int agg_medium_disk | |
Int agg_large_disk | |
Int preemptible_tries | |
Int agg_preemptible_tries | |
String base_file_name = sample_name + "." + ref_name | |
Array[File] flowcell_unmapped_bams = read_lines(flowcell_unmapped_bams_list) | |
# Get the version of BWA to include in the PG record in the header of the BAM produced | |
# by MergeBamAlignment. | |
call GetBwaVersion { | |
input: | |
docker_image = gotc_docker, | |
bwa_path = gotc_path, | |
preemptible_tries = preemptible_tries | |
} | |
# Align flowcell-level unmapped input bams in parallel | |
scatter (unmapped_bam in flowcell_unmapped_bams) { | |
# Get the basename, i.e. strip the filepath and the extension | |
String bam_basename = basename(unmapped_bam, unmapped_bam_suffix) | |
# Map reads to reference | |
call SamToFastqAndBwaMem { | |
input: | |
input_bam = unmapped_bam, | |
bwa_commandline = bwa_commandline, | |
output_bam_basename = bam_basename + ".unmerged", | |
ref_fasta = ref_fasta, | |
ref_fasta_index = ref_fasta_index, | |
ref_dict = ref_dict, | |
docker_image = gotc_docker, | |
bwa_path = gotc_path, | |
gotc_path = gotc_path, | |
disk_size = flowcell_medium_disk, | |
preemptible_tries = preemptible_tries, | |
compression_level = compression_level | |
} | |
# Merge original uBAM and BWA-aligned BAM | |
call MergeBamAlignment { | |
input: | |
unmapped_bam = unmapped_bam, | |
bwa_commandline = bwa_commandline, | |
bwa_version = GetBwaVersion.version, | |
aligned_bam = SamToFastqAndBwaMem.output_bam, | |
output_bam_basename = bam_basename + ".aligned.unsorted", | |
ref_fasta = ref_fasta, | |
ref_fasta_index = ref_fasta_index, | |
ref_dict = ref_dict, | |
docker_image = gatk_docker, | |
gatk_path = gatk_path, | |
disk_size = flowcell_medium_disk, | |
preemptible_tries = preemptible_tries, | |
compression_level = compression_level | |
} | |
} | |
# Aggregate aligned+merged flowcell BAM files and mark duplicates | |
# We take advantage of the tool's ability to take multiple BAM inputs and write out a single output | |
# to avoid having to spend time just merging BAM files. | |
call MarkDuplicates { | |
input: | |
input_bams = MergeBamAlignment.output_bam, | |
output_bam_basename = base_file_name + ".aligned.unsorted.duplicates_marked", | |
metrics_filename = base_file_name + ".duplicate_metrics", | |
docker_image = gatk_docker, | |
gatk_path = gatk_path, | |
disk_size = agg_large_disk, | |
compression_level = compression_level, | |
preemptible_tries = agg_preemptible_tries | |
} | |
# Sort aggregated+deduped BAM file and fix tags | |
call SortAndFixTags { | |
input: | |
input_bam = MarkDuplicates.output_bam, | |
output_bam_basename = base_file_name + ".aligned.duplicate_marked.sorted", | |
ref_dict = ref_dict, | |
ref_fasta = ref_fasta, | |
ref_fasta_index = ref_fasta_index, | |
docker_image = gatk_docker, | |
gatk_path = gatk_path, | |
disk_size = agg_large_disk, | |
preemptible_tries = 0, | |
compression_level = compression_level | |
} | |
# Create list of sequences for scatter-gather parallelization | |
call CreateSequenceGroupingTSV { | |
input: | |
ref_dict = ref_dict, | |
docker_image = python_docker, | |
preemptible_tries = preemptible_tries | |
} | |
# Perform Base Quality Score Recalibration (BQSR) on the sorted BAM in parallel | |
scatter (subgroup in CreateSequenceGroupingTSV.sequence_grouping) { | |
# Generate the recalibration model by interval | |
call BaseRecalibrator { | |
input: | |
input_bam = SortAndFixTags.output_bam, | |
input_bam_index = SortAndFixTags.output_bam_index, | |
recalibration_report_filename = base_file_name + ".recal_data.csv", | |
sequence_group_interval = subgroup, | |
dbSNP_vcf = dbSNP_vcf, | |
dbSNP_vcf_index = dbSNP_vcf_index, | |
known_indels_sites_VCFs = known_indels_sites_VCFs, | |
known_indels_sites_indices = known_indels_sites_indices, | |
ref_dict = ref_dict, | |
ref_fasta = ref_fasta, | |
ref_fasta_index = ref_fasta_index, | |
docker_image = gatk_docker, | |
gatk_path = gatk_path, | |
disk_size = agg_small_disk, | |
preemptible_tries = agg_preemptible_tries | |
} | |
} | |
# Merge the recalibration reports resulting from by-interval recalibration | |
call GatherBqsrReports { | |
input: | |
input_bqsr_reports = BaseRecalibrator.recalibration_report, | |
output_report_filename = base_file_name + ".recal_data.csv", | |
docker_image = gatk_docker, | |
gatk_path = gatk_path, | |
disk_size = flowcell_small_disk, | |
preemptible_tries = preemptible_tries | |
} | |
scatter (subgroup in CreateSequenceGroupingTSV.sequence_grouping_with_unmapped) { | |
# Apply the recalibration model by interval | |
call ApplyBQSR { | |
input: | |
input_bam = SortAndFixTags.output_bam, | |
input_bam_index = SortAndFixTags.output_bam_index, | |
output_bam_basename = base_file_name + ".aligned.duplicates_marked.recalibrated", | |
recalibration_report = GatherBqsrReports.output_bqsr_report, | |
sequence_group_interval = subgroup, | |
ref_dict = ref_dict, | |
ref_fasta = ref_fasta, | |
ref_fasta_index = ref_fasta_index, | |
docker_image = gatk_docker, | |
gatk_path = gatk_path, | |
disk_size = agg_small_disk, | |
preemptible_tries = agg_preemptible_tries | |
} | |
} | |
# Merge the recalibrated BAM files resulting from by-interval recalibration | |
call GatherBamFiles { | |
input: | |
input_bams = ApplyBQSR.recalibrated_bam, | |
output_bam_basename = base_file_name, | |
docker_image = gatk_docker, | |
gatk_path = gatk_path, | |
disk_size = agg_large_disk, | |
preemptible_tries = agg_preemptible_tries, | |
compression_level = compression_level | |
} | |
# Outputs that will be retained when execution is complete | |
output { | |
File duplication_metrics = MarkDuplicates.duplicate_metrics | |
File bqsr_report = GatherBqsrReports.output_bqsr_report | |
File analysis_ready_bam = GatherBamFiles.output_bam | |
File analysis_ready_bam_index = GatherBamFiles.output_bam_index | |
File analysis_ready_bam_md5 = GatherBamFiles.output_bam_md5 | |
} | |
} | |
# TASK DEFINITIONS | |
# Get version of BWA | |
task GetBwaVersion { | |
Int preemptible_tries | |
String mem_size | |
String docker_image | |
String bwa_path | |
command { | |
# Not setting "set -o pipefail" here because /bwa has a rc=1 and we don't want to allow rc=1 to succeed | |
# because the sed may also fail with that error and that is something we actually want to fail on. | |
${bwa_path}bwa 2>&1 | \ | |
grep -e '^Version' | \ | |
sed 's/Version: //' | |
} | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
} | |
output { | |
String version = read_string(stdout()) | |
} | |
} | |
# Read unmapped BAM, convert on-the-fly to FASTQ and stream to BWA MEM for alignment | |
task SamToFastqAndBwaMem { | |
File input_bam | |
String bwa_commandline | |
String output_bam_basename | |
File ref_fasta | |
File ref_fasta_index | |
File ref_dict | |
# This is the .alt file from bwa-kit (https://github.com/lh3/bwa/tree/master/bwakit), | |
# listing the reference contigs that are "alternative". Leave blank in JSON for legacy | |
# references such as b37 and hg19. | |
File? ref_alt | |
File ref_amb | |
File ref_ann | |
File ref_bwt | |
File ref_pac | |
File ref_sa | |
Int compression_level | |
Int preemptible_tries | |
Int disk_size | |
String mem_size | |
String num_cpu | |
String docker_image | |
String bwa_path | |
String gotc_path | |
String java_opt | |
command <<< | |
set -o pipefail | |
set -e | |
# set the bash variable needed for the command-line | |
bash_ref_fasta=${ref_fasta} | |
java -Dsamjdk.compression_level=${compression_level} ${java_opt} -jar ${gotc_path}picard.jar \ | |
SamToFastq \ | |
INPUT=${input_bam} \ | |
FASTQ=/dev/stdout \ | |
INTERLEAVE=true \ | |
NON_PF=true \ | |
| \ | |
${bwa_path}${bwa_commandline} /dev/stdin - 2> >(tee ${output_bam_basename}.bwa.stderr.log >&2) \ | |
| \ | |
samtools view -1 - > ${output_bam_basename}.bam | |
>>> | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
cpu: num_cpu | |
disks: "local-disk " + disk_size + " HDD" | |
} | |
output { | |
File output_bam = "${output_bam_basename}.bam" | |
File bwa_stderr_log = "${output_bam_basename}.bwa.stderr.log" | |
} | |
} | |
# Merge original input uBAM file with BWA-aligned BAM file | |
task MergeBamAlignment { | |
File unmapped_bam | |
String bwa_commandline | |
String bwa_version | |
File aligned_bam | |
String output_bam_basename | |
File ref_fasta | |
File ref_fasta_index | |
File ref_dict | |
Int compression_level | |
Int preemptible_tries | |
Int disk_size | |
String mem_size | |
String docker_image | |
String gatk_path | |
String java_opt | |
command { | |
# set the bash variable needed for the command-line | |
bash_ref_fasta=${ref_fasta} | |
${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt}" \ | |
MergeBamAlignment \ | |
--VALIDATION_STRINGENCY SILENT \ | |
--EXPECTED_ORIENTATIONS FR \ | |
--ATTRIBUTES_TO_RETAIN X0 \ | |
--ALIGNED_BAM ${aligned_bam} \ | |
--UNMAPPED_BAM ${unmapped_bam} \ | |
--OUTPUT ${output_bam_basename}.bam \ | |
--REFERENCE_SEQUENCE ${ref_fasta} \ | |
--PAIRED_RUN true \ | |
--SORT_ORDER "unsorted" \ | |
--IS_BISULFITE_SEQUENCE false \ | |
--ALIGNED_READS_ONLY false \ | |
--CLIP_ADAPTERS false \ | |
--MAX_RECORDS_IN_RAM 2000000 \ | |
--ADD_MATE_CIGAR true \ | |
--MAX_INSERTIONS_OR_DELETIONS -1 \ | |
--PRIMARY_ALIGNMENT_STRATEGY MostDistant \ | |
--PROGRAM_RECORD_ID "bwamem" \ | |
--PROGRAM_GROUP_VERSION "${bwa_version}" \ | |
--PROGRAM_GROUP_COMMAND_LINE "${bwa_commandline}" \ | |
--PROGRAM_GROUP_NAME "bwamem" \ | |
--UNMAPPED_READ_STRATEGY COPY_TO_TAG \ | |
--ALIGNER_PROPER_PAIR_FLAGS true \ | |
--UNMAP_CONTAMINANT_READS true | |
} | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
disks: "local-disk " + disk_size + " HDD" | |
} | |
output { | |
File output_bam = "${output_bam_basename}.bam" | |
} | |
} | |
# Sort BAM file by coordinate order and fix tag values for NM and UQ | |
task SortAndFixTags { | |
File input_bam | |
String output_bam_basename | |
File ref_dict | |
File ref_fasta | |
File ref_fasta_index | |
Int compression_level | |
Int preemptible_tries | |
Int disk_size | |
String mem_size | |
String docker_image | |
String gatk_path | |
String java_opt_sort | |
String java_opt_fix | |
command { | |
set -o pipefail | |
${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt_sort}" \ | |
SortSam \ | |
--INPUT ${input_bam} \ | |
--OUTPUT /dev/stdout \ | |
--SORT_ORDER "coordinate" \ | |
--CREATE_INDEX false \ | |
--CREATE_MD5_FILE false \ | |
| \ | |
${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt_fix}" \ | |
SetNmAndUqTags \ | |
--INPUT /dev/stdin \ | |
--OUTPUT ${output_bam_basename}.bam \ | |
--CREATE_INDEX true \ | |
--CREATE_MD5_FILE true \ | |
--REFERENCE_SEQUENCE ${ref_fasta} | |
} | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
disks: "local-disk " + disk_size + " HDD" | |
} | |
output { | |
File output_bam = "${output_bam_basename}.bam" | |
File output_bam_index = "${output_bam_basename}.bai" | |
File output_bam_md5 = "${output_bam_basename}.bam.md5" | |
} | |
} | |
# Mark duplicate reads to avoid counting non-independent observations | |
task MarkDuplicates { | |
Array[File] input_bams | |
String output_bam_basename | |
String metrics_filename | |
Int compression_level | |
Int preemptible_tries | |
Int disk_size | |
String mem_size | |
String docker_image | |
String gatk_path | |
String java_opt | |
# Task is assuming query-sorted input so that the Secondary and Supplementary reads get marked correctly. | |
# This works because the output of BWA is query-grouped and therefore, so is the output of MergeBamAlignment. | |
# While query-grouped isn't actually query-sorted, it's good enough for MarkDuplicates with ASSUME_SORT_ORDER="queryname" | |
command { | |
${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt}" \ | |
MarkDuplicates \ | |
--INPUT ${sep=' --INPUT ' input_bams} \ | |
--OUTPUT ${output_bam_basename}.bam \ | |
--METRICS_FILE ${metrics_filename} \ | |
--VALIDATION_STRINGENCY SILENT \ | |
--OPTICAL_DUPLICATE_PIXEL_DISTANCE 2500 \ | |
--ASSUME_SORT_ORDER "queryname" \ | |
--CREATE_MD5_FILE true | |
} | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
disks: "local-disk " + disk_size + " HDD" | |
} | |
output { | |
File output_bam = "${output_bam_basename}.bam" | |
File duplicate_metrics = "${metrics_filename}" | |
} | |
} | |
# Generate sets of intervals for scatter-gathering over chromosomes | |
task CreateSequenceGroupingTSV { | |
File ref_dict | |
Int preemptible_tries | |
String mem_size | |
String docker_image | |
# Use python to create the Sequencing Groupings used for BQSR and PrintReads Scatter. | |
# It outputs to stdout where it is parsed into a wdl Array[Array[String]] | |
# e.g. [["1"], ["2"], ["3", "4"], ["5"], ["6", "7", "8"]] | |
command <<< | |
python <<CODE | |
with open("${ref_dict}", "r") as ref_dict_file: | |
sequence_tuple_list = [] | |
longest_sequence = 0 | |
for line in ref_dict_file: | |
if line.startswith("@SQ"): | |
line_split = line.split("\t") | |
# (Sequence_Name, Sequence_Length) | |
sequence_tuple_list.append((line_split[1].split("SN:")[1], int(line_split[2].split("LN:")[1]))) | |
longest_sequence = sorted(sequence_tuple_list, key=lambda x: x[1], reverse=True)[0][1] | |
# We are adding this to the intervals because hg38 has contigs named with embedded colons (:) and a bug in | |
# some versions of GATK strips off the last element after a colon, so we add this as a sacrificial element. | |
hg38_protection_tag = ":1+" | |
# initialize the tsv string with the first sequence | |
tsv_string = sequence_tuple_list[0][0] + hg38_protection_tag | |
temp_size = sequence_tuple_list[0][1] | |
for sequence_tuple in sequence_tuple_list[1:]: | |
if temp_size + sequence_tuple[1] <= longest_sequence: | |
temp_size += sequence_tuple[1] | |
tsv_string += "\t" + sequence_tuple[0] + hg38_protection_tag | |
else: | |
tsv_string += "\n" + sequence_tuple[0] + hg38_protection_tag | |
temp_size = sequence_tuple[1] | |
# add the unmapped sequences as a separate line to ensure that they are recalibrated as well | |
with open("sequence_grouping.txt","w") as tsv_file: | |
tsv_file.write(tsv_string) | |
tsv_file.close() | |
tsv_string += '\n' + "unmapped" | |
with open("sequence_grouping_with_unmapped.txt","w") as tsv_file_with_unmapped: | |
tsv_file_with_unmapped.write(tsv_string) | |
tsv_file_with_unmapped.close() | |
CODE | |
>>> | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
} | |
output { | |
Array[Array[String]] sequence_grouping = read_tsv("sequence_grouping.txt") | |
Array[Array[String]] sequence_grouping_with_unmapped = read_tsv("sequence_grouping_with_unmapped.txt") | |
} | |
} | |
# Generate Base Quality Score Recalibration (BQSR) model | |
task BaseRecalibrator { | |
File input_bam | |
File input_bam_index | |
String recalibration_report_filename | |
Array[String] sequence_group_interval | |
File dbSNP_vcf | |
File dbSNP_vcf_index | |
Array[File] known_indels_sites_VCFs | |
Array[File] known_indels_sites_indices | |
File ref_dict | |
File ref_fasta | |
File ref_fasta_index | |
Int preemptible_tries | |
Int disk_size | |
String mem_size | |
String docker_image | |
String gatk_path | |
String java_opt | |
command { | |
${gatk_path} --java-options "${java_opt}" \ | |
BaseRecalibrator \ | |
-R ${ref_fasta} \ | |
-I ${input_bam} \ | |
--use-original-qualities \ | |
-O ${recalibration_report_filename} \ | |
--known-sites ${dbSNP_vcf} \ | |
--known-sites ${sep=" --known-sites " known_indels_sites_VCFs} \ | |
-L ${sep=" -L " sequence_group_interval} | |
} | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
disks: "local-disk " + disk_size + " HDD" | |
} | |
output { | |
File recalibration_report = "${recalibration_report_filename}" | |
} | |
} | |
# Combine multiple recalibration tables from scattered BaseRecalibrator runs | |
# Note that when run from GATK 3.x the tool is not a walker and is invoked differently. | |
task GatherBqsrReports { | |
Array[File] input_bqsr_reports | |
String output_report_filename | |
Int preemptible_tries | |
Int disk_size | |
String mem_size | |
String docker_image | |
String gatk_path | |
String java_opt | |
command { | |
${gatk_path} --java-options "${java_opt}" \ | |
GatherBQSRReports \ | |
-I ${sep=' -I ' input_bqsr_reports} \ | |
-O ${output_report_filename} | |
} | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
disks: "local-disk " + disk_size + " HDD" | |
} | |
output { | |
File output_bqsr_report = "${output_report_filename}" | |
} | |
} | |
# Apply Base Quality Score Recalibration (BQSR) model | |
task ApplyBQSR { | |
File input_bam | |
File input_bam_index | |
String output_bam_basename | |
File recalibration_report | |
Array[String] sequence_group_interval | |
File ref_dict | |
File ref_fasta | |
File ref_fasta_index | |
Int preemptible_tries | |
Int disk_size | |
String mem_size | |
String docker_image | |
String gatk_path | |
String java_opt | |
command { | |
${gatk_path} --java-options "${java_opt}" \ | |
ApplyBQSR \ | |
-R ${ref_fasta} \ | |
-I ${input_bam} \ | |
-O ${output_bam_basename}.bam \ | |
-L ${sep=" -L " sequence_group_interval} \ | |
-bqsr ${recalibration_report} \ | |
--static-quantized-quals 10 --static-quantized-quals 20 --static-quantized-quals 30 \ | |
--add-output-sam-program-record \ | |
--create-output-bam-md5 \ | |
--use-original-qualities | |
} | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
disks: "local-disk " + disk_size + " HDD" | |
} | |
output { | |
File recalibrated_bam = "${output_bam_basename}.bam" | |
} | |
} | |
# Combine multiple recalibrated BAM files from scattered ApplyRecalibration runs | |
task GatherBamFiles { | |
Array[File] input_bams | |
String output_bam_basename | |
Int compression_level | |
Int preemptible_tries | |
Int disk_size | |
String mem_size | |
String docker_image | |
String gatk_path | |
String java_opt | |
command { | |
${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt}" \ | |
GatherBamFiles \ | |
--INPUT ${sep=' --INPUT ' input_bams} \ | |
--OUTPUT ${output_bam_basename}.bam \ | |
--CREATE_INDEX true \ | |
--CREATE_MD5_FILE true | |
} | |
runtime { | |
preemptible: preemptible_tries | |
docker: docker_image | |
memory: mem_size | |
disks: "local-disk " + disk_size + " HDD" | |
} | |
output { | |
File output_bam = "${output_bam_basename}.bam" | |
File output_bam_index = "${output_bam_basename}.bai" | |
File output_bam_md5 = "${output_bam_basename}.bam.md5" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment