Skip to content

Instantly share code, notes, and snippets.

@cjw85
Last active Nov 13, 2021
Embed
What would you like to do?
Non-blocking Nextflow workflow with unknown-sized gather step
#!/usr/bin/env nextflow
// Demonstration of hierarchical scatter/gather in Nextflow
// Tasks have two pieces of meta a chr (chromosome) and a reg (sub-
// chromosomal region). The workflow includes a step which gathers
// of all regions for a chromosome.
//
// The process phase_region is fast when chr in {"chr1", "chr2"} and slow
// when chr == "chr3". The gather process for distinct chromosomes
// should not block on tasks processing regions of a different chromosome,
// BUT currently they do.
//
// The key piece of the puzzle here is that the grouping of the channel
// containing all per-regions results requires knowledge of the total
// number of regions for each chromosome to not cause a block. These numbers
// are not known ahead of time as they are calculated within a process. We
// need to make this information accessble to the groupTuple() which must
// be done using an auxiliary counts channel.
//
// To demonstrate the blocking the workflow has an option `--cheat True/False`
// When this is set to True hard-coded values for the should-be unknown number
// of regions per-chromosome is used to assist the groupTuple(). We observe
// that the gather_regions and extra_magic tasks for chr1 and chr2 are not
// blocked when cheating is enabled. When cheating is disabled blocking occurs
// and no onward tasks are performed until all 8 phase_region tasks are
// complete.
//
// The non-cheating solution is enabled with `--solution True` which uses
// the auxiliary channel to perform the grouping and also does not block.
nextflow.enable.dsl=2
process make_regions {
input:
val chr
output:
path "regions.bed"
script:
"""
#!/usr/bin/env python
chr = "${chr}"
# some "random" number the outside world doesnt know
nregs = {"chr1": 3, "chr2": 1, "chr3": 4}[chr]
with open("regions.bed", "w") as out:
for i in range(nregs):
out.write(f"{chr}-region_{i}\\n")
"""
}
process phase_region {
input:
tuple val(chr), val(data), val(region)
output:
tuple val(chr), val(region), path("region-${region}.vcf")
script:
"""
# make chr3 block everything else
if [[ $chr == "chr3" ]]; then sleep 10; fi
sleep 1
echo ${data} > region-${region}.vcf
"""
}
process gather_regions {
input:
tuple val(chr), path("vcfs/*")
output:
tuple val(chr), path("${chr}.vcf")
script:
"""
cat vcfs/* > ${chr}.vcf
"""
}
process extra_magic {
input:
tuple val(chr), path("chr.vcf")
output:
path("chr.vcf")
script:
"""
echo "Nothing to see here"
sleep 3
"""
}
process cat_all {
publishDir "${params.out}", mode: 'copy', pattern: "all.vcf"
input:
path("*.vcf")
output:
path("all.vcf")
script:
"""
cat *.vcf > all.vcf
"""
}
workflow {
// some existential demo data
data = Channel.of(
["chr1", "chr1-data"],
["chr2", "chr2-data"],
["chr3", "chr3-data"])
// pull out chroms
chroms = data.map { it[0] }
chroms.dump(tag: "chroms")
// call a magic process that splits chroms into regions
// and read those regions into a channel
res = make_regions(chroms)
regions = res.splitText() {
chr = (it =~ /(.+)-/)[0]
[chr[1], it.trim()] }
regions.dump(tag: "regions")
// make an auxiliary channel counting the number of regions per-chromosome
reg_counts = regions
.groupTuple()
.map {chr, regs -> tuple(chr, groupKey(chr, regs.size())) }
reg_counts.dump(tag: "reg_counts")
// duplicate the per-chrom data for each region
work_items = data
.cross(regions)
.map { i, j -> [i[0], i[1], j[1]] }
work_items.dump(tag: "work_items")
// do some work on each region, this is the central process
// that calculates something for all regions of all chromosomes
phase_region(work_items)
// collate results for each chrom - THIS IS THE TROUBLESOME PART
if (params.cheat) {
// use the unknown region numbers to group without blocking
// this is here simpy to demonstrate knowing the
nregs = ["chr1": 3, "chr2": 1, "chr3": 4]
phase_region.out
.map { chr, reg, vcf -> tuple( groupKey(chr, nregs[chr]), vcf ) }
.groupTuple()
.set { vcfs_by_chrom }
} else if (params.solution) {
// reg_counts contains chr and *chr, where *chr is our decorated key
// use vanilla chr to cross with outputs, then use *chr to group
reg_counts
.cross(phase_region.out) // cross is performed using chr
.map { i, j -> [i[1], j[2]] } // i[1] is *chr
.groupTuple() // uses *chr as key
.set { vcfs_by_chrom }
} else {
// vanilla groupBy without knowledge of group sizes,
// execution of gather_regions cannot process until the groupTuple
// is resolved, that only occurs when all phase_region tasks have
// finished.
phase_region.out
.groupTuple()
.map { i, j, k -> [i, k] }
.set { vcfs_by_chrom }
}
vcfs_by_chrom.dump(tag: "vcfs_by_chrom")
// process that creates resultant per-chrom results
gather_regions(vcfs_by_chrom)
// onward processing
extra_magic(gather_regions.out)
// collate everything just because
cat_all(extra_magic.out.collect())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment