Last active
August 30, 2022 13:09
-
-
Save cjw85/d334352e49ddd2e8bf2bd8e3891f3fe5 to your computer and use it in GitHub Desktop.
Non-blocking Nextflow workflow with unknown-sized gather step
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env nextflow | |
// Demonstration of hierarchical scatter/gather in Nextflow | |
// Tasks have two pieces of meta a chr (chromosome) and a reg (sub- | |
// chromosomal region). The workflow includes a step which gathers | |
// of all regions for a chromosome. | |
// | |
// The process phase_region is fast when chr in {"chr1", "chr2"} and slow | |
// when chr == "chr3". The gather process for distinct chromosomes | |
// should not block on tasks processing regions of a different chromosome, | |
// BUT currently they do. | |
// | |
// The key piece of the puzzle here is that the grouping of the channel | |
// containing all per-regions results requires knowledge of the total | |
// number of regions for each chromosome to not cause a block. These numbers | |
// are not known ahead of time as they are calculated within a process. We | |
// need to make this information accessble to the groupTuple() which must | |
// be done using an auxiliary counts channel. | |
// | |
// To demonstrate the blocking the workflow has an option `--cheat True/False` | |
// When this is set to True hard-coded values for the should-be unknown number | |
// of regions per-chromosome is used to assist the groupTuple(). We observe | |
// that the gather_regions and extra_magic tasks for chr1 and chr2 are not | |
// blocked when cheating is enabled. When cheating is disabled blocking occurs | |
// and no onward tasks are performed until all 8 phase_region tasks are | |
// complete. | |
// | |
// The non-cheating solution is enabled with `--solution True` which uses | |
// the auxiliary channel to perform the grouping and also does not block. | |
nextflow.enable.dsl=2 | |
process make_regions { | |
input: | |
val chr | |
output: | |
path "regions.bed" | |
script: | |
""" | |
#!/usr/bin/env python | |
chr = "${chr}" | |
# some "random" number the outside world doesnt know | |
nregs = {"chr1": 3, "chr2": 1, "chr3": 4}[chr] | |
with open("regions.bed", "w") as out: | |
for i in range(nregs): | |
out.write(f"{chr}-region_{i}\\n") | |
""" | |
} | |
process do_stuff_for_region { | |
input: | |
tuple val(chr), val(data), val(region) | |
output: | |
tuple val(chr), val(region), path("region-${region}.vcf") | |
script: | |
""" | |
# make chr3 block everything else | |
if [[ $chr == "chr3" ]]; then sleep 10; fi | |
sleep 1 | |
echo ${data} > region-${region}.vcf | |
""" | |
} | |
process gather_regions_for_chr { | |
input: | |
tuple val(chr), path("vcfs/*") | |
output: | |
tuple val(chr), path("${chr}.vcf") | |
script: | |
""" | |
cat vcfs/* > ${chr}.vcf | |
""" | |
} | |
process extra_magic { | |
input: | |
tuple val(chr), path("chr.vcf") | |
output: | |
path("chr.vcf") | |
script: | |
""" | |
echo "Nothing to see here" | |
sleep 3 | |
""" | |
} | |
process cat_all { | |
publishDir "${params.out}", mode: 'copy', pattern: "all.vcf" | |
input: | |
path("*.vcf") | |
output: | |
path("all.vcf") | |
script: | |
""" | |
cat *.vcf > all.vcf | |
""" | |
} | |
workflow { | |
// some existential demo data | |
data = Channel.of( | |
["chr1", "chr1-data"], | |
["chr2", "chr2-data"], | |
["chr3", "chr3-data"]) | |
// pull out chroms | |
chroms = data.map { it[0] } | |
chroms.dump(tag: "chroms") | |
// call a magic process that splits chroms into regions | |
// and read those regions into a channel | |
res = make_regions(chroms) | |
regions = res.splitText() { | |
chr = (it =~ /(.+)-/)[0] | |
[chr[1], it.trim()] } | |
regions.dump(tag: "regions") | |
// make an auxiliary channel counting the number of regions per-chromosome | |
reg_counts = regions | |
.groupTuple() | |
.map {chr, regs -> tuple(chr, groupKey(chr, regs.size())) } | |
reg_counts.dump(tag: "reg_counts") | |
// duplicate the per-chrom data for each region | |
work_items = data | |
.cross(regions) | |
.map { i, j -> [i[0], i[1], j[1]] } | |
work_items.dump(tag: "work_items") | |
// do some work on each region, this is the central process | |
// that calculates something for all regions of all chromosomes | |
phase_region(work_items) | |
// collate results for each chrom - THIS IS THE TROUBLESOME PART | |
if (params.cheat) { | |
// use the unknown region numbers to group without blocking | |
// this is here simpy to demonstrate knowing the number of | |
// items in each group allows a non-blocking groupBy | |
nregs = ["chr1": 3, "chr2": 1, "chr3": 4] | |
phase_region.out | |
.map { chr, reg, vcf -> tuple( groupKey(chr, nregs[chr]), vcf ) } | |
.groupTuple() | |
.set { vcfs_by_chrom } | |
} else if (params.solution) { | |
// reg_counts contains chr and *chr, where *chr is our decorated key | |
// use vanilla chr to cross with outputs, then use *chr to group | |
reg_counts | |
.cross(phase_region.out) // cross is performed using chr | |
.map { i, j -> [i[1], j[2]] } // i[1] is *chr | |
.groupTuple() // uses *chr as key | |
.set { vcfs_by_chrom } | |
} else { | |
// vanilla groupBy without knowledge of group sizes, | |
// execution of gather_regions cannot process until the groupTuple | |
// is resolved, that only occurs when all phase_region tasks have | |
// finished. | |
phase_region.out | |
.groupTuple() | |
.map { i, j, k -> [i, k] } | |
.set { vcfs_by_chrom } | |
} | |
vcfs_by_chrom.dump(tag: "vcfs_by_chrom") | |
// process that creates resultant per-chrom results | |
gather_regions(vcfs_by_chrom) | |
// onward processing | |
extra_magic(gather_regions.out) | |
// collate everything just because | |
cat_all(extra_magic.out.collect()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment