Skip to content

Instantly share code, notes, and snippets.

Last active Nov 13, 2021
What would you like to do?
Non-blocking Nextflow workflow with unknown-sized gather step
#!/usr/bin/env nextflow
// Demonstration of hierarchical scatter/gather in Nextflow
// Tasks have two pieces of meta a chr (chromosome) and a reg (sub-
// chromosomal region). The workflow includes a step which gathers
// of all regions for a chromosome.
// The process phase_region is fast when chr in {"chr1", "chr2"} and slow
// when chr == "chr3". The gather process for distinct chromosomes
// should not block on tasks processing regions of a different chromosome,
// BUT currently they do.
// The key piece of the puzzle here is that the grouping of the channel
// containing all per-regions results requires knowledge of the total
// number of regions for each chromosome to not cause a block. These numbers
// are not known ahead of time as they are calculated within a process. We
// need to make this information accessble to the groupTuple() which must
// be done using an auxiliary counts channel.
// To demonstrate the blocking the workflow has an option `--cheat True/False`
// When this is set to True hard-coded values for the should-be unknown number
// of regions per-chromosome is used to assist the groupTuple(). We observe
// that the gather_regions and extra_magic tasks for chr1 and chr2 are not
// blocked when cheating is enabled. When cheating is disabled blocking occurs
// and no onward tasks are performed until all 8 phase_region tasks are
// complete.
// The non-cheating solution is enabled with `--solution True` which uses
// the auxiliary channel to perform the grouping and also does not block.
process make_regions {
val chr
path "regions.bed"
#!/usr/bin/env python
chr = "${chr}"
# some "random" number the outside world doesnt know
nregs = {"chr1": 3, "chr2": 1, "chr3": 4}[chr]
with open("regions.bed", "w") as out:
for i in range(nregs):
process phase_region {
tuple val(chr), val(data), val(region)
tuple val(chr), val(region), path("region-${region}.vcf")
# make chr3 block everything else
if [[ $chr == "chr3" ]]; then sleep 10; fi
sleep 1
echo ${data} > region-${region}.vcf
process gather_regions {
tuple val(chr), path("vcfs/*")
tuple val(chr), path("${chr}.vcf")
cat vcfs/* > ${chr}.vcf
process extra_magic {
tuple val(chr), path("chr.vcf")
echo "Nothing to see here"
sleep 3
process cat_all {
publishDir "${params.out}", mode: 'copy', pattern: "all.vcf"
cat *.vcf > all.vcf
workflow {
// some existential demo data
data = Channel.of(
["chr1", "chr1-data"],
["chr2", "chr2-data"],
["chr3", "chr3-data"])
// pull out chroms
chroms = { it[0] }
chroms.dump(tag: "chroms")
// call a magic process that splits chroms into regions
// and read those regions into a channel
res = make_regions(chroms)
regions = res.splitText() {
chr = (it =~ /(.+)-/)[0]
[chr[1], it.trim()] }
regions.dump(tag: "regions")
// make an auxiliary channel counting the number of regions per-chromosome
reg_counts = regions
.map {chr, regs -> tuple(chr, groupKey(chr, regs.size())) }
reg_counts.dump(tag: "reg_counts")
// duplicate the per-chrom data for each region
work_items = data
.map { i, j -> [i[0], i[1], j[1]] }
work_items.dump(tag: "work_items")
// do some work on each region, this is the central process
// that calculates something for all regions of all chromosomes
// collate results for each chrom - THIS IS THE TROUBLESOME PART
if (params.cheat) {
// use the unknown region numbers to group without blocking
// this is here simpy to demonstrate knowing the
nregs = ["chr1": 3, "chr2": 1, "chr3": 4]
.map { chr, reg, vcf -> tuple( groupKey(chr, nregs[chr]), vcf ) }
.set { vcfs_by_chrom }
} else if (params.solution) {
// reg_counts contains chr and *chr, where *chr is our decorated key
// use vanilla chr to cross with outputs, then use *chr to group
.cross(phase_region.out) // cross is performed using chr
.map { i, j -> [i[1], j[2]] } // i[1] is *chr
.groupTuple() // uses *chr as key
.set { vcfs_by_chrom }
} else {
// vanilla groupBy without knowledge of group sizes,
// execution of gather_regions cannot process until the groupTuple
// is resolved, that only occurs when all phase_region tasks have
// finished.
.map { i, j, k -> [i, k] }
.set { vcfs_by_chrom }
vcfs_by_chrom.dump(tag: "vcfs_by_chrom")
// process that creates resultant per-chrom results
// onward processing
// collate everything just because
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment