Skip to content

Instantly share code, notes, and snippets.

@mlin
Last active November 26, 2019 01:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mlin/5ac534d6655a687f0e35586c25b64c8a to your computer and use it in GitHub Desktop.
Save mlin/5ac534d6655a687f0e35586c25b64c8a to your computer and use it in GitHub Desktop.
Split large VCF for parallel load into Spark
version 1.0
task split_vcf_for_spark {
# Quickly split a large .vcf.gz file into a specified number of compressed partitions.
#
# Motivation: calling SparkContext.textFile on a single large vcf.gz can be painfully slow,
# because it's decompressed and parsed in ~1 thread. Use this to first split it up (with a
# faster multithreaded pipeline); then tell Spark to parallel load the data using textFile on a
# glob pattern.
#
# To maximize parallelism, the input VCF lines are dealt round-robin around to the partition
# files; so each partition is internally sorted, but the Spark RDD/DataFrame would need to be
# re-sorted if the original total ordering is desired.
#
# The VCF header is output in a separate, uncompressed text file.
input {
File vcf_gz
Int partitions = 16
String name = basename(vcf_gz, ".vcf.gz")
Int cpu = 16
}
command <<<
set -eux
export LC_ALL=C
apt-get -qq update && apt-get -qq -y install pv tabix
# assume header text is less than 4MB
mkdir partitions
bgzip -dc "~{vcf_gz}" | head -c 4194304 | grep ^\# > "partitions/~{name}._HEADER"
set -o pipefail
bgzip -dc@ 4 "~{vcf_gz}" | pv -abtfi 10 | \
split -n r/~{partitions} --numeric-suffixes=1 -a $(expr length ~{partitions}) -u \
--additional-suffix=of~{partitions}.vcf.gz \
--filter='grep -v ^\# | bgzip -c -l 1 > $FILE' - "partitions/~{name}."
# problems with other ways of using split:
# -n l/N: needs uncompressed input file
# --lines: need total # of lines to control # output partitions
# --line-bytes: liable to cut up individual lines longer than SIZE
# also, these recompress the partitions one-at-a-time which is slower.
>>>
output {
File header = glob("partitions/*._HEADER")[0]
Array[File] partitions_vcf_gz = glob("partitions/*.vcf.gz")
}
runtime {
cpu: cpu
memory: "~{2+(if partitions >= 16 then partitions/8 else 2)}G"
# need recent version bgzip supporting multithreading and libdeflate
docker: "ubuntu:19.10"
}
}
version 1.0
import "split_vcf_for_spark.wdl" as tasks
workflow test {
input {
File? test_vcf_gz
String test_vcf_url = "https://1000genomes.s3.amazonaws.com/release/20130502/ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz"
}
if (!defined(test_vcf_gz)) {
call aria2c {
input:
url = test_vcf_url
}
}
File eff_test_vcf_gz = select_first([test_vcf_gz, aria2c.file])
call tasks.split_vcf_for_spark {
input:
vcf_gz = eff_test_vcf_gz,
partitions = 64
}
call validate {
input:
vcf_gz = eff_test_vcf_gz,
header = split_vcf_for_spark.header,
partitions_vcf_gz = split_vcf_for_spark.partitions_vcf_gz
}
}
task aria2c {
input {
String url
Int connections = 10
}
command <<<
set -euxo pipefail
mkdir __out
cd __out
aria2c -x ~{connections} -j ~{connections} -s ~{connections} \
--file-allocation=none --retry-wait=2 --stderr=true \
"~{url}"
>>>
output {
File file = glob("__out/*")[0]
}
runtime {
docker: "hobbsau/aria2"
}
}
task validate {
input {
File vcf_gz
File header
Array[File] partitions_vcf_gz
}
command <<<
set -eux
export LC_ALL=C
# compare the headers
gzip -dc "~{vcf_gz}" | head -c 4194304 | grep ^\# > input_header
diff input_header "~{header}" 2>&1
# digest the uncompressed contents of vcf_gz
set -o pipefail
gzip -dc "~{vcf_gz}" | grep -v ^\# | sort | sha256sum > input_digest & pid=$!
# "unsplit" the parts and digest them
cat "~{write_lines(partitions_vcf_gz)}" | xargs -n 999999 gzip -dc | sort | sha256sum > output_digest
# compare the digests
wait $pid
diff input_digest output_digest 2>&1
>>>
runtime {
docker: "ubuntu:19.04"
cpu: 8
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment