Skip to content

Instantly share code, notes, and snippets.

@fortune9
Last active February 15, 2023 03:38
Show Gist options
  • Save fortune9/a986df3c44bf988d649afbd5d84ae949 to your computer and use it in GitHub Desktop.
Save fortune9/a986df3c44bf988d649afbd5d84ae949 to your computer and use it in GitHub Desktop.
Learning Notes on NextFlow

Notes on NextFlow

Zhenguo Zhang February 14, 2023

NextFlow is a platform to run data analysis pipelines written in any language. It provides an abstraction layer between a pipeline’s logic and the underlying execution layers. Therefore, the pipeline written in NextFlow is portable.

The implementation of NextFlow uses Bash and Groovy language (a super set of Java). To learn more about Groovy, one can check this link.

Installation

There are two ways to install NextFlow:

# 1. installed in current directory
curl -fsSL https://get.nextflow.io | bash 

#To install a specific version, one need set environment variable
#`NXF_VER` first, such as

export NXF_VER=22.9.0-edge
curl -fsSL https://get.nextflow.io | bash

# 2. install using Bioconda, may be outdated
conda install -c bioconda nextflow 

Run it

The command to run it is as follows:

# run through a github repo
nextflow run git-name/repo --opt1 val1
# or a local script
nextflow run local.nf --opt1 val1

To run a command in background (continue running even terminate the terminal), use the option ‘-bg’. Note that nohup can’t be used with nextflow because the pipeline will stopped.

Nextflow pipeline can also be run from public repositories such as github, gitlab, etc. To do so, one can run in one of the following way:

# find the pipeline in public repo host, default github
nextflow run owner/repoName -hub github
# or specify the full path to the repo, then option '-hub' is unneeded.
nextflow run http://github.com/owner/repoName

To access private repo, provide -user argument and input password when prompted, or setup SCM file by following the instruction here.

Here are some nextflow commands to manage repositories (see this doc for details):

command explanation
list show all downloaded repos.
info show info of a downloaded repo.
pull pull a repo.
view print out the pipeline script, such as main.nf
clone clone repo to a target folder
drop delete a downloaded repo.

Structure

A nextflow pipeline consists of processes, and each process is block of code that run a task. A process contains input, output, and the code to process input and generate output. The running of each process is independent in its own temporarily generated folder and don’t share common state.

Another important component for nextflow is channel. Channel connects output of preceeding/feeding processes to the input of following/consuming processes. The consuming processes needn’t wait for the completion of the feeding process, as long as data avaliable in the channel, the consuming processes are started. Because of this structure, the parallel processing is achieved.

A process can define one or more input and outpur channels, and these channels connect processes and drive the running of pipelines.

There are two types of channels: queue and value. The queue channels are created implicitly by channel factories such as Channel.from() and Channel.fromPath(), or by process output definitions. They are asynchronous unidirectional FIFO queues.

Note: in old nextflow version, a queue channel can only be consumed once. For example, the following code will show that the second call of ‘ch.view()’ would fail.

ch=Channel.from(1,2,3)
ch.view()
ch.view()

However, a value channel, which is bound to a single value, can be consumed unlimited times, and can be created using the value factory method or by operators returning a single value, such us first, last, collect, count, min, max, reduce, sum. See example below:

ch=Channel.value("hello nextflow")
ch.view()
ch.view()

In general, a process’s architecture is as follows:

process <name> {
    [directives]
    input:
    <process inputs>
    output:
    <process outputs>
    when:
    <condition>
    [script|shell|exec]:
    <user script>
}

directives, input, output, and when are all optional.

script

A process contains one and only one script block, and must be the last statement.

In default, the script block can be a mix of groovy languages and strings, and the strings are explained as bash, but any language can be specified by adding a Shebang declaration at the begining of string. Actually, any groovy statements can be put before the ‘script:’ block, which can be used to define some variables (need def keyword) or output information. See groovy document here. Also see the nextflow’s introduction on scripting at here, which includes some operations of strings and files such as file.copyTo().

Strings can be defined using single or double quote, and multi-line strings can use triple quotes. Note that single-quote prevents interpolation of pipeline variables.

An alternative keyword to ‘script’ is ‘shell’. They are identical, except for the following:

  1. using single quote to enclose the code; otherwise dollar variable is explained as pipeline variable.

  2. using !{var} to refer to pipeline variable, and use ${sys} to refer to environment variable.

Finally, one can also use the keyword ‘exec’ instead of ‘script’, to write some code using native nextflow/groovy language, one example is as below:

x = Channel.from( 'a', 'b', 'c')

process simpleSum {
    input:
    val x

    exec:
    println "Hello Mr. $x"
}

To call a user script, the script needs be put into a ‘bin’ subfolder located in the same folder as ‘nextflow.config’, or in the folders in ‘PATH’ environment variable. If in the ‘bin’ subfolder, the folder will also be uploaded to working directory when using awsbatch to execute the pipeline, or add to PATH variable when running locally.

Note that it may fail when running a workflow in a sub-folder and the workflow refers to the executables in the root bin/ folder, because nextflow only upload/add the ‘bin’ folder which is in the same folder as the triggering script. To solve this problem, one may use the variable moduleDir to refer to the bin folder and add it to the PATH environment variable. See the following folder structure as an example:

rootdir |
        |___main.nf (call my_exe)
        |___bin
           |___my_exe
        |___workflow
           |___wf1.nf (call my_exe)

## successful run
nextflow run main.nf
## failed run
nextflow run workflow/wf1.nf
## to fix this problem, the file workflow/wf1.nf need add
## "$moduleDir/../bin" to PATH or use "$moduleDir/../bin/my_exe"
## to refer the executable.    

Note that the above method of setting PATH will fail the pipeline if it is run on AWS batch or alike, because the ‘PATH’ variable always refer to the user’s local environment variable, not the one in task instance. One solution to this is that one can use a condition ${task.executor} == 'local' to test whether the task is run locally, if so, set PATH, otherwise not.

Another solution is to make a symbolic link in the subworkflow folder to the ‘bin’ folder; however, if any script in the bin folder refers to other resources, those resources have to be linked, too.

The next solution can be one include the sub-workflow modules in the main script ‘main.nf’, and then use the option ‘-entry’ to specify certain sub-workflow to execute.

The last solution is one can set env.PATH to include the bin folder in nextflow.config. This will ensure that the executables are accessible in all workflows. Note that the path to the bin folder needs to be absolute path. Note that this will not work for remote executors such as aws batch, because the bin folder won’t be uploaded and thus inaccessible.

To include the nextflow.config file in the project root folder to sub-workflow (workflow/) in this case, and use includeConfig ../nextflow.config to add main configuration file.

Channel factories

factory queue type description
value value bind a single string/list to a channel, and emit it as a whole.
from queue accept a list of values, and emit each element separately, deprecated soon, and should use Channel.of() instead
of queue similar to from, can also handle range values like 1..23
fromList queue accept a list and creates a channel emitting elements
fromPath queue accept a string (possibly with file glob pattern) and return a file list channel. One can add options by appending {opt: value}. The string can contain ’*‘,’?’ wildcards. And to get into sub-folders, use ’**‘, and also consider use option ’maxDepth’ to control the folder levels to search in.
fromFilePairs queue similar to fromPath, but return groups of files for each element, like [id, [file1, file2]]. Option size can be used to return how many files for each record.
fromSRA queue query NCBI SRA using accessions or project IDs and return a list of fastq files.

Input type

The input block can contain one or more inputs declarations, using the following format

input:
    <qualifier> <variable name> from <source channel> 
qualifier source type return type description
val any strings accepts any data type and return string elements
file file path file path accepts file paths from file factories; files are also staged. If inputs are simply strings, then the strings will be the content of a temporary file.
path string file path accept a string and return file object, solves the problem of ‘file’ when using string. Note that the string need be in absolute path format.
tuple composite channel composite elements comma separated vals and files
each a list each list element input repeater, it runs the whole process with each input element (over all other input values for each element by this qualifier).
env string environment variable set input value as environment variable named with specified qualifier name.

input channel combination

If the input block has multiple channels, it can generate combined values, see the examples below:

  input:
  val x from Channel.from(1,2,3)
  val y from Channel.from('a','b','c')
  script:
  """
  echo $x and $y
  """

it emits three pairs (1,a), (2,b), (3,c).

Note: When one channel has fewer elements than the others, the process will end after consuming all the elements in the channel with the fewest elements. However, the number of elements in value channels will not terminate processes, because it can be consumed unlimited times.

IMPORTANT

There are senarios that one channel A has only one element and channel B has multiple, like aligning multiple fastq files to a genome file. In this case, one can use the operator ‘toList()’ or ‘collect()’ to convert A queue channel to a value channel, and this will ensure each element from channel B is run against the only element in channel A. This also works for the channel that emits tuples.

To check a channel type, one can do the following:

// assume there are two channels: ch1 and ch2 for value and queue channel, respectively
println ch1.getClass() // get groovyx.gpars.dataflow.DataflowVariable for value channel
println ch1.getClass() // get groovyx.gpars.dataflow.DataflowQueue for queue channel

Also a input value channel is implicitly created by a process when an input specifies a simple value (not a channel) in the from clause. Moreover, an output value channel is also implicitly created for a process whose inputs are only value channels. The code below will show the cases:

// simple value as input to create input value channel, so can be combined with other multi-value channels as input for other processes.
input:
    path fa from params.fasta

// this will create a queue channel instead, so processes using this channel as input will run only once.
input:
    path fa from Channel.fromPath(params.fasta)

Output

Only output block can be provided for each task, and it can contain one or more declarations. The declaration follows the following format:

output:
    <qualifier> <variable name> into <channel 1>[,<channel 2>, ...]

The qualifiers can be summarized as below:

qualifier source type return type description
val value element value element it sends value elements to output channel
file file names file object sends file objects to output channel
tuple tuple list tuple list it sends a tuple as element to output channel

One can use a file glob format such as ’chunk_*’ to generate a set of files. The glob matching has the following properties:

  1. input files are never matched against.
  2. it matches both files and directories.
  3. if ’**’ is used instead of ’*’, it will cross over directory boundaries and only match files.

One can also put several files into a list, using the format like below (enclosed with braces):

output: path '{f1.txt,f2.txt}' into ch_out

Variables

In nextflow, there are both pipeline variables and environment variables. The former are the variables that are defined in the pipeline, such as params, val x from ch_x, and the latter are defined in the system running the pipeline or in the script block, such as variables for bash, python, perl, etc. Depending on how the commands are quoted in the script block, different methods are needed for access these variables:

  • tri-double quotes: pipeline variable uses prefix ‘$’, such as ‘$params.num’, and environment variables use escape ‘\’, such as ‘\$enVar’.

  • tri-single quotes: pipeline variables are not accessible, and environment variables use ‘$’ prefix, such as ‘$enVar’.

  • use ‘shell’ block instead of ‘script’: pipeline variable uses ‘!{params.num}’, and environment variables use ‘$enVar’. Note that pipeline variable must be put in ‘{}’ and the code block must be quoted using single quotes.

Also if a pipeline variable is enclosed in single quotes when declared, then it will still need a ‘$’ in script block, for example, the following two are equivalent:

params.txt=Channel.from("Hello","NextFlow")
params.txt1=Channel.from("Hello1","NextFlow1")

process quotes{
    input:
        val 'x' from params.txt
    
    script:
    """
    echo $x
    """
}

process noquotes{
    input:
        val x from params.txt1
    
    script:
    """
    echo $x
    # this is a bash comment with unknown variable $y; error will be triggered
    """
}

Note that in the script block, any variable, even the one in bash comments, such as in the last code block, need a definition, otherwise error is triggered.

Also there are predefined variables in nextflow, summarized as below

variableName source description
it channel each item in a channel

Operators on channels

operator target description
view channel print out the content of a channel
flatten
collect
join
map
mix

process control

if-else

The if-else structure follows the same style in C and Java.

params.p="hello"

process ifelse {
        input:
        val x from params.p

        script:
        if(x == 'hello')
                """
                echo This is block 1 $x
                """
        else if(x=="world")
                """
                echo This is block 2 $x
                """
        else
                """
                echo This is a surprise $x
                """
}

It also has ternary expression like ‘a? a > 10 : -1’.

for-loop

It has two structures as below:

for( int i=0; i<3; i++)
{
    println "here is $i"
}
// or using a list
names=['a','b','c']
for(String el : names)
{
    println("my name is $el")
}

when

Conditional execution

params.do = true
process {
    output:
    path 'out.txt' into out_ch
    
    when:
    params.do
    
    // the following code is run only when params.do is true
    script:
    '''
    echo "This is a conditional process" > out.txt
    '''
}

Note that if a process is skipped, then its output is also skipped, so one can’t rely on the output channel.

file staging

nextflow stages files so that they can be cached/reused in future runs. However, if the staging process was interrupted and the file was not correctly staged, it will not report errors in the following runs. Need be very careful. This issue was discussed here. If a pipeline was interrupted forcibly, then one should delete the working directory before resume/rerun the pipeline.

Note that for fromPath, the input can be a folder or file. If it is a folder, the whole folder is staged. When providing inputs on command line, if the path contains wildcard symbols such as ’*’ and ‘?’, these paths need be quoted, otherwise they will be expanded on the command line and only first element is passed into program.

functions

One can define functions in a script as follows:

int sum(int x, int y){
    return(x+y)
}

One can also define closures as below:

double={it * 2} // define a closure to double input. it is the implict variable
double.call(4) // call the closure with argument 4
sum={x,y -> x+y} // two arguments
sum.call(5,6)

Some bultin functions:

function purpose
template load external script into a process. The script should exist in the templates folder, otherwise absolute path is needed.
log.info print out strings, like println
string.stripIndent remove indent spaces from a multi-line string

nextflow running options

option description
-resume continue from where last run stopped
-process.echo print out the result from script blocks
-with-docker use the docker environment setup in the nextflow.config file
-with-conda activate a conda environment by specifying an environment recipe file or its name
-with-report create execution report
-with-trace create file trace.txt containing running information for each task
-with-timeline show the time used by each task.
-with-dag render workflow using direct acyclic graph, needing Graphviz installed.
-w the folder in which tasks are run, default is ‘work’.

Note when specififying parameters, one need use double dashes, such as –greeting nextflow to provide value for params.greeting.

directives

Directives defines optional settings that affects the execution of current process, without affecting the semantic of the task itself. It stays at the top of a process.

A complete list of directives is at https://www.nextflow.io/docs/latest/process.html#directives.

A summary of the major directives is as follows:

directive example explanation
cpus cpus 2 claim the number of cpus to use per task. If this is 2, and the number of tasks running in parallel is 4, then the machine needs 8 cpus.
memory memory ‘8.0 GB’ claim the amount of memory to use
disk disk ‘2.0 GB’ disk amount required for the process.
tag tag “$sampleId” assign a label to a running task
echo echo true let task to print out to standard output
container container ‘image/name’ docker container to be used by the workflow.
publishDir publishDir “/path/to/results”, mode: ‘copy’ send the results to directory, otherwise task-specific output files are deleted upon completion. The directory can be remote, too. One can use –outdir to change this folder. ‘saveAs’ use a closure to accept filename as string and set output filename.
errorStrategy errorStrategy ‘ignore’ how to handle errors occured in a process. Available options are: terminate (terminate the whole pipeline immediately), finish (terminate after submitted jobs completed), ignore (ignore the error), retry (rerun the errored process).
maxErrors maxErrors 3 the number of retries for the whole process across all tasks when errorStrategy is set to ‘retry’.
maxRetries maxRetries 2 the number of retries for a task (different from maxErrors) when errorStrategy is set to ‘retry’.
label label ‘big_mem’

operators

Operators are bultin functions that applied to channels and can be used to transform, filter, fork, and combine channels. The full list of operators is at https://www.nextflow.io/docs/latest/operator.html.

A summary of useful operators:

operator example description
map numCh.map(it -> it*it) transform each element using a function, here to get the squares.
view myCh.view() print out the elements and each is appended a new line.
into myCh.into {aCh; bCh} copy the source channel to each target channel.
mix myCh.mix(aCh,bCh) merge all channels into one.
flatten tupleCh.flatten() tuples in a channel is flattened and each element is emitted separately.
collect myCh.collect() collect all elements into a list and emit as one; the opposite of flatten. Don’t use this on a file path, which will lead to a list of items separated by path separator.
groupTuple tupleCh.groupTuple() group all tuples that have the same key into one tuple.
join leftCh.join(rightCh) like ‘merge’ in R, merge two channels into one based on key match.
branch myCh.branch{small: it < 5; big: it >=5} return a multi-channel object, with labels as keys.
set Channel.from(1,2,34).set(numCh) set channel name
cross srcCh.cross(targetCh) output only the source items whose keys (default: 1st item) have a match in the target channel. Like R’s merge() function.
subscribe ch.subscribe { println “Got: $it” } allow user to define a function to run on each emit element.
collectFile ch.collectFile(name:“outfile.txt”) store all emited elements into the specified file.
combine ch1.combine(ch2) combine two channels in the form of cartesian product, or based on a key given by option ‘by: pos’.
concat ch1.concat(ch2,ch3) concatenate the elements in multiple channels into one.

Groovy language

NextFlow is a DSL implementation on Groovy language, so it can run any Groovy and Java code.

Below is a summary of the major operations by Groovy language:

Comments

The same as C-style syntax, one can use

// single line comments

/* comments
block
*/

Variables

There are global and local variables. Global variables are directly created by assigning a value, such as

x=1
x=-3.14
x="Hello"
x=new java.util.Date()

While local variables are created using keyword def as

def localX="for a closure or function"

lists

In addition to simple variables, there are also lists. They are created by putting elements in a square-bracket. The examples are below:

list=[0,1,2,3]
list[0] // get the first element
list.get(0) // the same thing
list.size() // get the list size
list.reverse() // reverse the list
list[-1] // last element
list.unique() // unique elements
list.min() // minimal element
list.max() // maximum
list.count(3) // count the element '3'
list.sort() // sort elements
list.flatten() // flatten out list elements to single-value elements
list.sum() // total
list.find{it % 2 == 0} // find first even elements
list.findAll{it % 2 == 0} // find all even elements

Maps

Maps are more like dictionaries in other languages. It has all methods implemented in java.util.Map and Groovy API. Below are some frequent operations:

map=[a:1, b:2, c:3] // define a map
map['a'] // get the element with key 'a'
map.a // the same as above
map.get('a') // the same as above
map.put('z', 100) // add a value with key 'z'

Strings

Strings can be defined by enclosing them in single or double quotes. When using double quotes, it can use ‘' for including variables or any expression, like 'var’ or ${command/operation}.

Strings can also be defined using ‘/’ as delimiter, such as ‘/here/’, often used for defining regular expressions. These are called slashy strings.

Slashy strings can have multiple lines. Another way to have multi-line strings is using triple single and double quotes. Note that slashy strings and double-quote strings support variable interpolation.

File operations

  • Convert a string to a file object: myFile=file(‘path/to/file’)

  • Get a file content as text: myFile.text

  • Get a file content as byte array for binary data: myFile.bytes

  • Save binary data to a file: myFile.bytes = binaryBuffer

  • Add new content to a file: myFile.append(‘line 1’)

  • Read lines as an array of strings: myFile.readLines()

NextFlow configuration

The documentation can be found here https://www.nextflow.io/docs/latest/config.html

nextflow.config

This file hosts the settings for pipelines, and it may appear in multiple locations. Here are the locations that a nextflow script searches for settings in the order of decreasing priority (top ones override bottom ones):

  1. values provided with command line as –option ‘value’.

  2. values set in file provided by -params-file.

  3. values set in file provided by -c ‘config-file’.

  4. file nextflow.config in current folder.

  5. file nextflow.config in the workflow project folder.

  6. file $HOME/.nextflow/config.

  7. values defined in pipeline script itself.

If one doesn’t want to use default nextflow.config files mentioned above, but only user-specified one, use the option ‘-C user.config’ (note capitalized C is used here).

One config file can include other config files using the keyword includeConfig.

Note that in a config file, a value set later will override the value set earlier, including the sections imported with the keyword includeConfig and defined in profiles. Therefore, it is very important to make sure the order of the sections match the need of priority expectation: put high-priority settings at later sections.

Also when specifying multiple profiles like ‘-profile a,b’, then the profile defined later has higher priority than the one defined earlier. So if profile ‘a’ is defined later, then its setting has higher priority than ’b’s.

syntax

The syntax to define variales in config files are as follows:

name = value

  1. Note that value has type: characters should be quoted, while numbers and logical (true or false) should not be quoted.

  2. The variables defined in config file can be used in other variables in the format of $configVar or ${configExp}.

  3. The environment variables defined in host such as $PATH, $PWD, etc can also be used in config file.

  4. The same comment syntax is used in config file, i.e., ‘//’ or ’/* comment */’.

  5. variables can be in a scope by adding a prefix, such as ‘params.x’. There are two ways to define variables in a scope,

params.x="way 1"

params {
    x="way 2"
    y="extra"
}

Note that variables defined in the same scope can be used directly without prefix and override any variables defined outside, for example:

params.X = "outsideX"

params {
    X = "globalX"
    
    vars {
        X = "localX"
        Y = "This $X is always 'localX', overriding globalX"
        Yp = "This ${params.X} is affected by input parameters, default is 'globalX'"
    }
}

// test with the following code
println ${params.vars.X}
println ${params.vars.Y}
println ${params.vars.Yp}

Major scopes/variables

scope description
params Define parameters used in script. The precedence for parameters are as follows: command line > nextflow.config > workflow script.
env export variables to the environment where workflow tasks to execute.
process set process directives, such as cpus, memory, and container.

Important variables

variable scope example description
executor process process.executor = “awsbatch” set the deployment cluster
queue process process.queue = “myqueue” set the queue in the cluster/environment
cpus process cpus = 4 the number of cpus for a task execution
memory process memory = ‘10 GB’ the needed memory for a task execution
disk process disk = “100 GB” the disk storage required by a task
workDir N/A workDir=“s3://aws/data/dir” the directory where tasks temporary files are created, one can use ‘-w /new/path’ to set it on cmdline
region aws aws.region = “us-east-1” the AWS region for awsbatch
batch.cliPath aws aws.batch.cliPath = ‘/home/ec2-user/miniconda/bin/aws’ path to ‘aws’ command
batch.volumes aws aws.batch.volumes = [‘/tmp’, ‘/host/path:/mnt/path:ro’] attach host volumes to docker; the setting applies to all processes
baseDir global The directory where the main workflow script is located (deprecated in favour of projectDir since 20.04.0).
launchDir global The directory where the workflow is launched (requires version 20.04.0 or later).
moduleDir global The directory where a module/process script is located for DSL2 modules or the same as projectDir for a non-module script (requires version 20.04.0 or later).
nextflow global Dictionary like object representing nextflow runtime information (see Nextflow metadata).
params global Dictionary like object holding workflow parameters specifing in the config file or as command line options.
projectDir global The directory where the main script is located (requires version 20.04.0 or later).
workflow global Dictionary like object representing workflow runtime information (see Runtime metadata).

One can use the following variables in nextflow.config to choose and set the environment for running the workflow:

// using docker
process.container="image/name"
docker.enabled = true
// use singularity, can also use library://, shub://, docker://, docker-daemon:// protocols
process.container="path/to/file.sif"
singularity.enabled = true
// configue conda environment, provide an environment or YAML file to build environment
process.conda = "path/to/environment"

Bulk-set attributes

One can use the following ways to set attributes (resources, containers, etc) for processes or pipelines:

  1. “withName: procName {}”: set attributes for processes by name. ‘procName’ can be regular expression such as ‘abc|def’ (matching two processes) and ‘!bar’ (not process ‘bar’).

  2. “withLabel: ‘procLabel’ {}”: set attributes for processes by label, also accept regular expression as above.

  3. profiles: set execution profile (see below) for the whole pipeline.

A profile is a set of configuration attributes that can be activated/chosen when launching a pipeline execution by using the -profile command line option. An example is shown below:

profiles {

    standard {
        params.genome = '/local/path/ref.fasta'
        process.executor = 'local'
    }

    cluster {
        params.genome = '/data/stared/ref.fasta'
        process.executor = 'sge'
        process.queue = 'long'
        process.memory = '10GB'
        process.conda = '/some/path/env.yml'
    }

    cloud {
        params.genome = '/data/stared/ref.fasta'
        process.executor = 'awsbatch'
        process.container = 'cbcrg/imagex'
        docker.enabled = true
    }

}

The standard profile is the default one, and others can be specified with the option ‘-profile’.

Cloud deployment

AWS batch

  1. credentials

    nextflow will find aws credentials in the following order:

    1. Looking for the following environment variables:
      • AWS_ACCESS_KEY_ID
      • AWS_SECRET_ACCESS_KEY
      • AWS_DEFAULT_REGION
    2. read ~/.aws/credentials or ~/.aws/config for such information.
    3. or it can be configured in nextflow config file using variables ‘aws.accessKey’, ‘aws.secretKey’, and ‘aws.region’; look at https://www.nextflow.io/docs/latest/config.html#config-aws
  2. AWS-cli tools

    Nextflow requires to access the AWS command line tool (aws) from the container in which the job runs in order to stage the required input files and to copy back the resulting output files in the S3 storage.

    The aws tool can either be included in container image(s) used by your pipeline execution or installed in a custom AMI that needs to used in place of the default AMI when configuring the Batch Computing environment. For the latter, one can start an EC2 instance and install awscli using conda, and create an AMI from it. Don’t use pip to install awscli, which may not work in container. Finally use

    aws.batch.cliPath = '/home/ec2-user/miniconda/bin/aws'
    

    to specify the path where aws is installed in the AMI.

  3. Nextflow creates job definitions (tasks and containers) and jobs to run on aws batch, but users need create computer environments and awsbatch queues to run the jobs. Each process in the job can use different queue and docker images if necessary.

  4. Docker images

    The container image(s) must be published in a Docker registry that is accessible from the instances run by AWS Batch eg. Docker Hub, Quay or AWS ECS Container Registry.

  5. Container properties When nextflow define a batch job, it sets container properties (see this page). The properties include command to run and cpu/memory limits usable by a container. nextflow may put a very low value for cpus and memory, leading to the container killed when running a job task. It is always safer to specify the memory/cpu resource when using docker image in a nextflow process.

  6. Monitor jobs

    When a job is submitted to an AWS batch queue, one can go to AWS batch queue console to see the status of running jobs. In the log stream page, one can see the progress of a job running. The ‘container’ section shows the command called and the resources such as cpus and memory assigned.

Monitoring workflow

One can monitor the running a pipeline/workflow using the workflow object, such as

println "Project : $workflow.projectDir"
println "Git info: $workflow.repository - $workflow.revision [$workflow.commitId]

The table below lists the properties accessible from the workflow object:

Name Description
scriptId Project main script unique hash ID.
scriptName Project main script file name.
scriptFile Project main script file path.
repository Project repository Git remote URL.
commitId Git commit ID of the executed workflow repository.
revision Git branch/tag of the executed workflow repository.
projectDir Directory where the workflow project is stored in the computer.
launchDir Directory where the workflow execution has been launched.
workDir Workflow working directory.
homeDir User system home directory.
userName User system account name.
configFiles Configuration files used for the workflow execution.
container Docker image used to run workflow tasks. When more than one image is used it returns a map object containing [process name, image name] pair entries.
containerEngine Returns the name of the container engine (e.g. docker or singularity) or null if no container engine is enabled.
commandLine Command line as entered by the user to launch the workflow execution.
profile Used configuration profile.
runName Mnemonic name assigned to this execution instance.
sessionId Unique identifier (UUID) associated to current execution.
resume Returns true whenever the current instance is resumed from a previous execution.
start Timestamp of workflow at execution start.
manifest Entries of the workflow manifest.
✝ complete Timestamp of workflow when execution is completed.
✝ duration Time elapsed to complete workflow execution.
* success Reports if the execution completed successfully.
* exitStatus Exit status of the task that caused the workflow execution to fail.
* errorMessage Error message of the task that caused the workflow execution to fail.
* errorReport Detailed error of the task that caused the workflow execution to fail.

Also there are two handlers to handle the workflow:

Handler Description
workflow.onComplete what to do when the pipeline complete
workflow.onError what to do when error occured

DSL2

DSL2 is an extension of previous syntax, to enable it, one need put nextflow.enable.dsl=2 at the beginning of the workflow script.

The DSL2 has the following features:

Functions

def <function name> ( arg1, arg, .. ) {
    <function body>
}

Functions return the result of the last statement. To explicitly return a result, use the keyword return.

Note that functions don’t accept default arguments, so one has to call a function with all arguments all the time. For example, the following doesn’t work:

def f(a, b=10) { a + b }
// you can't call
f(1)
// you need call
f(1, 20)

Process

The new syntax is the same as the old one in defining process, and the only difference is to omit the ‘from’ and ‘into’ channel declaration, and then the process will be invoked in the workflow scope just like a function (so no invocation during the definition of process). Note that a process component can be invoked only once in the same workflow context.

An example is as follows:

nextflow.enable.dsl=2

process foo {
    output:
      path 'foo.txt'
    script:
      """
      your_command > foo.txt
      """
}

 process bar {
    input:
      path x
    output:
      path 'bar.txt'
    script:
      """
      another_command $x > bar.txt
      """
}

workflow {
    data = channel.fromPath('/some/path/*.txt')
    foo()
    bar(data)
}

In this way, one can use process composition like proc1(proc2()) as long as proc2’s output matches the requirement of proc1’s input. The output a process can be reached like proc.out. When a process defines two or more output channels, each of them can be accessed using the array element operator e.g. out[0], out[1], etc. or using named outputs.

To name a output channel which can be used in external scope, one can use syntax like path '*.bam', emit: samples_bam.

Workflow

one can use the workflow keyword to define sub-workflow, which can be called in other workflow definitions. See the below example:

nextflow.enable.dsl=2
// define process
process abc {
    output:
        val x
    
    script:
        x="Excellent nextflow"
}

process ddd {
    input:
        val x
    output:
        stdout
    
    script:
        """
        echo "Hello world: $x"
        """
}

// define sub-workflow component
workflow sub_flow {
    word=abc()
    res=ddd(word)
    res.view()
}

// triger workflow
workflow {
    sub_flow()
}

A workflow definition which does not declare any name is assumed to be the main workflow and it’s implicitly executed. Therefore it’s the entry point of the workflow application. An alternative workflow entry can be specified using the -entry command line option.

To refer to a process in a workflow component, one can use the format like ‘sub_flow:abc’, here it refers to the ‘abc’ process in workflow ‘sub_flow’. This reference can be used in nextflow.config file to setup configurations for a specific process, especially when one process is invoked in multiple workflows.

A workflow may contain the following 4 main segments: take, main, and emit. An example as follows:

include { foo } from '../some/module.nf'

workflow {
    // use take to list input channels
    take:
        x
        y
    // main contains the running processes/workflows
    main:
        foo(x,y)
    // emit to set output channels
    emit:
        foo.out
        res = foo.out // a named version
}

Note that when calling a workflow in another script/workflow, one can’t assign it to a variable and then use the variable to access the result. As an example:

process proc1 {
    ...
}

workflow wf1 {
    ...
}

workflow {
    procRes=proc1() // valid for process
    procRes.out.view()
    wf1() // can't assign to a variable
    wf1.out.view() // the only way to view workflow outputs
}

Module

A module can contain the definition of a function, process and workflow definitions. And module can be included or shared among workflow applications. Just like python modules. One example of including a module is as below:

include { foo; bar } from './some/module'

workflow {
    data = channel.fromPath('/some/data/*.txt')
    foo(data)
}

Note all the module paths should start with ‘./’, ‘../’, or ‘/’.

Including a module is equivalent to copy the relevant code from the module into the current pipeline, unless some modifications made (see below). So the included processes/workflows use current pipeline’s configurations.

When including a process from a module file, all code outside processes are also run, so channels and parameters defined outside processes are run, but these channels/parameters are invisible in current workflow, and so not usable. However, including a workflow has no such issue: the variables defined outside included workflows are available in that workflow, so one needn’t worry about that.

When including a module component it’s possible to specify a name alias. This allows the inclusion and the invocation of the same component multiple times in your script using different names: this is essential if one wants to use the same process multiple times in the same workflow.

include { foo } from './some/module'
include { foo as bar } from './other/module'

workflow {
    foo(some_data)
    bar(other_data)
}

Note that when including a module, by default, the parameters defined in current including main script will override those with the same names in the included modules (assuming the main script’s parameters are defined before module inclusion, otherwise the module values will be taken).

To avoid this issue, one can use the option params to specify one or more parameters from the included module and set values, and these values are not affected by the including script’s environment, nor by command line inputs. And for module-specific parameters that are not specified in params, they use the values set in the module, not changeable by including script or command line values.

Let’s take the following module as an example:

nextflow.enable.dsl=2

// this parameter is also defined in including parameter too.
params.globalInput="global input in module"
// the following parameters are module-specific
params.moduleInput="module input"
params.moduleInputNoChange="module input no change"

workflow wf_module {
    log.info """
    In workflow wf_module:
    globalInput: ${params.globalInput}
    moduleInput: ${params.moduleInput}
    moduleInputNoChange: ${params.moduleInputNoChange}
    """
}

And when one uses the following to include the module using the options, we will see some interesting output:

//
nextflow.enable.dsl=2
// global parameter
params.globalInput="global input in main"
include { wf_module as wf1 } from "./wf_module.nf" params (
    globalInput: "global input set in params",
    moduleInput: "module input set in params"
)

workflow {
    wf1()
    // the main script has no access to the two module-specific parameters
    // moduleInput and moduleInputNoChange, unless they are
    // specified in command to implicitly added these two parameters
}

/* expected output (run without command params setting):
    In workflow wf_module:
    globalInput: global input set in params
    moduleInput: module input set in params
    moduleInputNoChange: module input no change // set by module
*/

Another way to set module-specific parameters is to use the option addParams. Different from params, addParams exposes all parameters in the module, even those not specified in addParams. All the parameters set in the included module can be changed by the including script or command line, making it vulnerable to parameter changes, because change in one place may change the value for all processes/workflows using the parameter. Also these added module-specific parameters are not accessible in the including script, too, unless the same-name parameters are specified at the command line.

Let’s have a look at an example. Using the above wf_module as included module:

nextflow.enable.dsl=2

params.globalInput="global input in main"

// use addParams to add user-specific params
include { wf_module as wf2} from "./wf_module.nf" addParams (
    globalInput: "global input added",
    moduleInput: "module input added"
)
workflow {
    wf2()
/* expected output without command line params setting
    In workflow wf_module:
    globalInput: global input in main // not affected by addParams because it was set before this.
    moduleInput: module input added // set by addParams
    moduleInputNoChange: module input no change // use the module value
*/
}

Note that the parameters set for an included process/workflow are bound to that included process name, even a separate inclusion uses a different parameter, below is an example:

nextflow.enable.dsl=2

include { print_msg as msg1 } from \
    '../process/msg.nf' addParams (
        msg: "workflow msg1"
        )


include { print_msg as msg2 } from \
    '../process/msg.nf' addParams (
        msg: "workflow msg2"
        )


workflow {
    msg1() // output: workflow msg1
    msg2() // output: workflow msg2
}

And the process print_msg is simply a process to print out the parameter msg, and defined as below:

params.msg="This is default msg in msg.nf"
process print_msg1 {
    echo true

    script:
    """
    echo ${params.msg}
    """
}

Also note that the command line parameter setting does not overrides that set by addParams(), so if, for the above example, one provides ‘–msg “command line msg”’ on command line, then msg1() and msg2() will still print out old message.

Bug report: 08/01/2022, nextflow version 21.10.6 build 5660. When the parameter names have uppercase letters, the statement in the last paragraph isn’t true, and all the paramters using ‘addParams()’ will be overriden by global or commandline parameters. For example, in the following setting, params.msG is always “message in main”:

params.msG="message in main"

include { print_msg } from \
    'process.nf' addParams (
        msG: "setting a message which will be always overriden by main"
    )

Update: 08/01/2022 This bug remains there in nextflow version 22.04.5

Channel forking

Channel is automatically forked when it is consumed by multiple processes, so no more need into operator.

Pipes

One can use the pipe | and the and & operator in workflow to connect predefined processes and functions. For example:

process foo {
  input: val data
  output: val result
  exec:
    result = "$data world"
}

process bar {
    input: val data
    output: val result
    exec:
      result = data.toUpperCase()
}

workflow {
   channel.from('Hello') | map { it.reverse() } | (foo & bar) | mix | view
}

in the above snippet the channel emitting the Hello is piped with the map which reverses the string value. Then, the result is passed to either foo and bar processes which are executed in parallel. The result is pair of channels whose content is merged into a single channel using the mix operator. Finally the result is printed using the view operator.

Tricks

  1. Methods to convert a groovy map to string

    • Map.inspect(): all strings are quoted, good for strings with spaces
    • Map.toMapString(): convert to a string, similar to Map.inspect(), but strings are not quoted
    • JsonOutput.toJson(myMap): convert to a json string. Note that any value with ‘/’ must be quoted as strings, otherwise JsonOutput will reports the error ‘Unexpected error [StackOverflowError]’.

Caveats

  1. Zero is treated as false in logical test. a=0 if(! a) { // this will be printed println "0 is false in groovy" }

  2. When a file channel is created from a string containing wildcard symbols, the order of the files in the channel is unknown, and may vary among operating systems. Below is an example:

    file_ch = Channel.fromPath('dataDir/{total,perc,meth}*.txt.gz').collect()
    file_ch.view()
    /* output in aws ubuntu instance, and this order may change in other systems
    [meth_cov_matrix_CpG.txt.gz, perc_matrix_CpG.txt.gz, total_cov_matrix_CpG.txt.gz]
    */
    

    To ensure the right file is used, one need find the filenames or emit the files into individual channels.

  3. If an input channel is empty, then the process won’t be run at all, so the whole pipeline will exit silently without error. One solution is that one can provide a default value when a channel is empty, so that the process can run and check the received value and respond accordingly.

  4. The symbol ‘' is a special variable in nextflow for escaping characters. If one wants to use it in shell command, it has to be escaped by using double backslashes’\‘; otherwise nextflow may interpret the script code in an unexpected way. For example, in the following code, double backslashes are used in front of’+’ sign.

    script:
    """
    o="dmr_test.a_vs_b"
    label=\$(echo \$o | sed -e 's/^dmr_[^.]\\+.//');
    echo \$label
    """
    
  5. There are two ways for one to write values into a file: (1) via Channel’s collectFile() operator, (2) via the methods of the Path object. The below are two examples:

    // via Channel
    myStr="hello world"
    Channel.of(myStr).collectFile(name:"/path/to/outfile")
    // via Path
    myFile=file("/path/to/outfile")
    myFile.text=myStr
    

    Note that there are some key differences between the two ways:

    • The Path operation needs the outdir already exists (unless it is an AWS S3 path), otherwise it reports error. The Channel operation has no this issue.

    • If one file is operated by a Channel, this file seems not operable by the Path operation in the same nextflow process.

FAQs

  1. How to feed a program with multiple input files?

    A: Use ==path fs from Channel.fromPath("/path/*.fa").collect()==, where fs will contain all the files.

  2. How to merge all output files into one file?

    A: one can use collectFile() operator to collect all the values emitted from a channel.

  3. How to run a process in parallel?

    A: one can check an FAQ at https://www.nextflow.io/docs/latest/faq.html. Basically, one need create a channel to hold all input values, and then call this channel as input in a process, which will trigger the process on each of the input values.

  4. Nextflow pipeline hangs without completion, why?

    A. Possible reasons: (1) a channel is explicitly created using Channel.create(), and it needs be closed using ch.close(), otherwise the pipeline will not stop.

  5. The cached files are not updated even though input files have changed. Why?

    A. This can happen when input files are localed at AWS S3 and the executor is local and workdir is also in a local directory. In this case, even though files in the remote S3 folders have changed, the pipeline may not update local cache when re-run the pipeline, no matter -resume is used or not.

  6. A nextflow pipeline may not re-run and report error like [UnsupportedOperationException]. Why?

    A. This is more likely caused by existing files in S3 and the pipeline tries to rename it, but this renaming/moving operation in S3 is not supported by nextflow. One well known issue is the trace.txt file. One solution is that add “overwrite=true” to the ‘trace’ scope; or alternatively change trace dir to a different folder when re-running a pipeline. The same issue may exist for report, timeline, and dag; if it is necessary, add “overwrite=true” to those scopes, too.

  7. Can I stop a pipeline run by ‘Ctrl-C’?

    A: Yes, you can, especially when the run hangs. But be prepared that some weird issues may pop up when you resume the run later (with option -resume). Nextflow caches results and reuse them when a run is resumed. However, if a file transfer (staging) or writing is interrupted by a user, the resumed run may pick up these truncated or missing (if applicable) files, yielding wrong results or causing a breakdown. Therefore, ideally one should let a pipeline to finish started tasks before exiting when errors happened, rather than force stopping the run by ‘Ctrl-C’. If weird things happen, try to run pipelines without ‘-resume’, which may fix the issues.

  8. Can I set dynamic label for a process?

    A: If the variables used in dynamic label come from the inputs in the process or the attributes of task, the answer is NO. However, if the variable is defined in the scope outside the process, then it is YES. See the discussion here.

    Actually, all process directives can accept dynamic values (thus input variables) except the following 3 directives: executor, label, and maxForks, so only these 3 directives need variables defined outside process scope. One can find more about dynamic directives here.

  9. How to get store the size of a channel into a variable?

    A: This is actually not doable in nextflow due to the static nature of groovy language. To get the number of files in a channel, and assume the string params.infiles contains the filename string (with wildcards), one can do the following:

    numFiles = file(params.infiles).size()
    // then numFiles can be used as a variable in comparison
    if(numFiles > 100) {
        println "Too many files"
    } else {
        println "Just right"
    }
  10. How to run a pipeline under a different AWS account?

    A: First, one need to setup two files ~/.aws/credentials and ~/.aws/config by following the instruction.

    Say, the aws account has a profile name ‘User1’, then one can run nextflow under this account by setting environment variable AWS_PROFILE=User1. For example:

    AWS_PROFILE=User1  nextflow run /path/to/pipeline

    More details can be found at nextflow page.

  11. How to activate docker to let processes run in container?

    A: The directive process.container specifies the docker container(s) that each process is run. However, this container won’t be used unless certain conditions are met.

    For AWS batch, by default, all processes will be run in a container, so
    no more settings are needed.
    
    For a local run, one need use the command option `-with-docker` to activate the
    running of processes in the specified containers. Alternatively, one
    can set `docker.enabled = true` in nextflow.config to activate this option. However,
    the former allows one to specify a different docker image other than that set
    in nextflow.config.
    
    One can also use the command option `-without-docker` to disable running
    processes in containers.
    
  12. How to fix the error: ‘java.nio.file.AccessDeniedException’?

    A: This error is caused by the no permission to access files/folders created by docker containers. To fix this, one need do one of the following two settings in nextflow.config, so that the created files have the right ownership:

    ```
    // set docker run option; use single quotes
    docker.runOptions='-u $(id -u):$(id -g)'
    // alternatively, one can just set the following
    docker.fixOwnership = true
    ```
    
    Also see this [link](https://github.com/nf-core/tools/issues/336).
    
  13. How to fix error: “Unknown method invocation collect on GroupTupleOp type”?

    A: This error can be caused by a wrong input type to the operator groupTuple(). One situation is the output from the operator splitCsv(). To solve this issue, one can explicitly transform each input element into a list, as shown in the below example:

    ```
    Channel
        .fromPath("test.tsv")
        .splitCsv(header: false, skip: 1, sep: "\t")
        .map{row -> [row[0], row[1], row[2]] } // convert row into list to avoid the error
        .groupTuple(by: 0)
        .view()
    ```
    
  14. If there is a program named ‘exe’ in both container and the ‘bin’ folder, which will be used by the processes?

    A: If there is the same-named program in both container and nextflow project’s ‘bin’ folder, then the one in the ‘bin’ folder will be called.

  15. What caused the following error: ‘java.lang.UnsupportedOperationException: null at com.upplication.s3fs.S3FileSystemProvider.getFileAttributeView(S3FileSystemProvider.java:697)’

    A: This error occurred when I ran the nxf-sesame pipeline via awsbatch and both workdir and outdir are in AWS S3. It turned out that this looks like a bug in nextflow version 22.04.5. After I changed to use the version 22.09.5-edge, the problem is gone.

    So in general, when errors like "UnsupportedOperationException" happens, in
    addition to check the pipeline code, one may also try different nextflow
    versions for debugging.
    
  16. How to accept a groovy variable (e.g., list, map, string) in a nextflow process?

    A: one can pass a groovy variable to a process, and this process needs to handle this variable in groovy space before using it in bash script. Here, we use a Map object as an example.

    ```
    // create a map
    myMap=[a: 1, b:100, c: "a string"]
    // create the process
    process display_map {
    
        debug true
    
        input:
        val info
    
        script:
        println info.getClass()
        //set=info.entrySet()
        // create a string which can be used in bash
        bashCmd=""
        info.each { entry ->
            bashCmd += "$entry.key --> $entry.value\n"
            }
        """
        echo "$bashCmd" # print the string, if it is a command, one can run it directly 
        """
    }   
    
    // start the workflow
    workflow {
        display_map(myMap)
    }
    ```
    

Resources

  1. Nextflow documentation: https://www.nextflow.io/docs/latest
  2. AWS batch deployment: https://www.nextflow.io/docs/latest/awscloud.html#aws-batch
  3. AWS batch permission configuration: https://apeltzer.github.io/post/01-aws-nfcore/
  4. Nextflow training: https://codata-rda-advanced-bioinformatics-2019.readthedocs.io/en/latest/4.Day4.html
---
title: "Notes on NextFlow"
author: "Zhenguo Zhang"
date: "`r format(Sys.time(), '%B %d, %Y')`"
output:
github_document:
toc: true
html_document:
toc: yes
toc_float: true
---
```{r setup, include=F}
library(knitr)
opts_chunk$set(echo=F)
nfLink<-'[NextFlow](https://www.nextflow.io/)'
```
`r nfLink` is a platform to run data analysis pipelines
written in any language. It provides an abstraction layer between a pipeline's
logic and the underlying execution layers. Therefore, the pipeline written
in `r nfLink` is portable.
The implementation of `r nfLink` uses Bash and Groovy language (a super set of Java).
To learn more about Groovy, one can check this [link](https://www.manning.com/books/groovy-in-action-second-edition).
## Installation
There are two ways to install `r nfLink`:
```bash
# 1. installed in current directory
curl -fsSL https://get.nextflow.io | bash
#To install a specific version, one need set environment variable
#`NXF_VER` first, such as
export NXF_VER=22.9.0-edge
curl -fsSL https://get.nextflow.io | bash
# 2. install using Bioconda, may be outdated
conda install -c bioconda nextflow
```
## Run it
The command to run it is as follows:
```
# run through a github repo
nextflow run git-name/repo --opt1 val1
# or a local script
nextflow run local.nf --opt1 val1
```
To run a command in background (continue running even terminate the terminal),
use the option '-bg'. Note that *nohup* can't be used with nextflow because
the pipeline will stopped.
Nextflow pipeline can also be run from public repositories such as github, gitlab, etc.
To do so, one can run in one of the following way:
```
# find the pipeline in public repo host, default github
nextflow run owner/repoName -hub github
# or specify the full path to the repo, then option '-hub' is unneeded.
nextflow run http://github.com/owner/repoName
```
To access private repo, provide `-user` argument and input password when prompted, or
setup SCM file by following the instruction [here](https://www.nextflow.io/docs/latest/sharing.html#scm-configuration-file).
Here are some nextflow commands to manage repositories (see [this doc](https://www.nextflow.io/docs/latest/sharing.html) for details):
command | explanation
--- | :---
list | show all downloaded repos.
info | show info of a downloaded repo.
pull | pull a repo.
view | print out the pipeline script, such as main.nf
clone | clone repo to a target folder
drop | delete a downloaded repo.
## Structure
A nextflow pipeline consists of processes, and each process is block of code
that run a task. A process contains input, output, and the code to process input
and generate output. The running of each process is independent in its own
temporarily generated folder and don't share common state.
Another important component for nextflow is channel. Channel connects output
of preceeding/feeding processes to the input of following/consuming processes.
The consuming processes needn't wait for the completion of the feeding process,
as long as data avaliable in the channel, the consuming processes are started.
Because of this structure, the parallel processing is achieved.
A process can define one or more input and outpur channels, and these channels
connect processes and drive the running of pipelines.
There are two types of channels: **queue** and **value**. The queue channels are
created implicitly by channel factories such as Channel.from() and Channel.fromPath(),
or by process output definitions.
They are asynchronous unidirectional FIFO queues.
**Note: in old nextflow version, a queue channel can only be consumed once**. For example,
the following code will show that the second call of 'ch.view()' would fail.
```
ch=Channel.from(1,2,3)
ch.view()
ch.view()
```
However, a *value* channel, which is bound to a **single value**, can be
consumed unlimited times, and can be created using the `value` factory
method or by operators returning a single value, such us first, last,
collect, count, min, max, reduce, sum. See example below:
```
ch=Channel.value("hello nextflow")
ch.view()
ch.view()
```
In general, a process's architecture is as follows:
```
process <name> {
[directives]
input:
<process inputs>
output:
<process outputs>
when:
<condition>
[script|shell|exec]:
<user script>
}
```
*directives*, *input*, *output*, and *when* are all optional.
## script
A process contains one and only one script block, and must be the last statement.
In default, the script block can be a mix of groovy languages and strings, and the strings
are explained as bash, but any language can be specified
by adding a Shebang declaration at the begining of string.
Actually, any groovy statements can be put before the 'script:' block, which can be
used to define some variables (need `def` keyword) or output information.
See groovy document [here](http://groovy-lang.org/single-page-documentation.html).
Also see the nextflow's introduction on scripting at [here](https://www.nextflow.io/docs/latest/script.html),
which includes some operations of strings and files such as `file.copyTo()`.
Strings can be defined using single or double quote, and multi-line strings can
use triple quotes. Note that single-quote prevents interpolation of pipeline
variables.
An alternative keyword to 'script' is 'shell'. They are identical, except for the following:
1. using single quote to enclose the code; otherwise dollar variable is explained as pipeline variable.
2. using `!{var}` to refer to pipeline variable, and use `${sys}` to refer to environment variable.
Finally, one can also use the keyword 'exec' instead of 'script', to write some code using native
nextflow/groovy language, one example is as below:
```
x = Channel.from( 'a', 'b', 'c')
process simpleSum {
input:
val x
exec:
println "Hello Mr. $x"
}
```
To call a user script, the script needs be put into a 'bin' subfolder
located in the same folder as 'nextflow.config', or in the folders in
'PATH' environment variable. If in the 'bin' subfolder, the folder will
also be uploaded to working directory when using awsbatch to execute
the pipeline, or add to PATH variable when running locally.
Note that
it may fail when running a workflow in a sub-folder and the workflow
refers to the executables in the root bin/ folder, because nextflow
only upload/add the 'bin' folder which is in the same folder as
the triggering script. To solve this problem,
one may use the variable `moduleDir` to refer to the bin folder and add
it to the PATH environment variable. See the following folder structure
as an example:
```
rootdir |
|___main.nf (call my_exe)
|___bin
|___my_exe
|___workflow
|___wf1.nf (call my_exe)
## successful run
nextflow run main.nf
## failed run
nextflow run workflow/wf1.nf
## to fix this problem, the file workflow/wf1.nf need add
## "$moduleDir/../bin" to PATH or use "$moduleDir/../bin/my_exe"
## to refer the executable.
```
Note that the above method of setting PATH will fail the pipeline
if it is run on AWS batch or alike, because the 'PATH' variable
always refer to the user's local environment variable, not the one
in task instance. One solution to this is that one can use a condition
`${task.executor} == 'local'` to test whether the task is run locally,
if so, set PATH, otherwise not.
Another solution is to make a symbolic link in the subworkflow folder
to the 'bin' folder; however, if any script in the bin folder refers
to other resources, those resources have to be linked, too.
The next solution can be one include the sub-workflow modules in the
main script 'main.nf', and then use the option '-entry' to specify
certain sub-workflow to execute.
The last solution is one can set `env.PATH` to include the `bin`
folder in nextflow.config. This will ensure that the executables
are accessible in all workflows. Note that the path to the `bin`
folder needs to be absolute path. Note that this will not work for
remote executors such as aws batch, because the bin folder won't be
uploaded and thus inaccessible.
To include the nextflow.config file in the project root folder
to sub-workflow (workflow/) in this case, and use
`includeConfig ../nextflow.config` to add main configuration file.
## Channel factories
factory | queue type | description
--- | --- | ---
value | value | bind a single string/list to a channel, and emit it as a whole.
from | queue | accept a list of values, and emit each element separately, deprecated soon, and should use Channel.of() instead
of | queue | similar to *from*, can also handle range values like 1..23
fromList | queue | accept a list and creates a channel emitting elements
fromPath | queue | accept a string (possibly with file glob pattern) and return a file list channel. One can add options by appending {opt: value}. The string can contain '*', '?' wildcards. And to get into sub-folders, use '**', and also consider use option 'maxDepth' to control the folder levels to search in.
fromFilePairs | queue | similar to *fromPath*, but return groups of files for each element, like [id, [file1, file2]]. Option `size` can be used to return how many files for each record.
fromSRA | queue | query NCBI SRA using accessions or project IDs and return a list of fastq files.
## Input type
The input block can contain one or more inputs declarations, using the following format
```
input:
<qualifier> <variable name> from <source channel>
```
qualifier | source type | return type | description
--- | --- | --- | ---
val | any | strings | accepts any data type and return string elements
file | file path | file path | accepts file paths from file factories; files are also staged. If inputs are simply strings, then the strings will be the content of a temporary file.
path | string | file path | accept a string and return file object, solves the problem of 'file' when using string. Note that the string need be in absolute path format.
tuple | composite channel | composite elements | comma separated vals and files
each | a list | each list element | input repeater, it runs the whole process with each input element (over all other input values for each element by this qualifier).
env | string | environment variable | set input value as environment variable named with specified qualifier name.
## input channel combination
If the input block has multiple channels, it can generate combined values,
see the examples below:
```
input:
val x from Channel.from(1,2,3)
val y from Channel.from('a','b','c')
script:
"""
echo $x and $y
"""
```
it emits three pairs (1,a), (2,b), (3,c).
**Note**: When one channel has fewer elements than the others, the process will end after consuming
all the elements in the channel with the fewest elements. However, the number of elements
in value channels will not terminate processes, because it can be consumed unlimited times.
**IMPORTANT**
There are senarios that one channel A has only one element and channel B has multiple, like
aligning multiple fastq files to a genome file. In this case, one can use the operator
'toList()' or 'collect()' to convert A `queue` channel to a `value` channel, and this will
ensure each element from channel B is run against the only element in channel A. This also
works for the channel that emits tuples.
To check a channel type, one can do the following:
```nextflow
// assume there are two channels: ch1 and ch2 for value and queue channel, respectively
println ch1.getClass() // get groovyx.gpars.dataflow.DataflowVariable for value channel
println ch1.getClass() // get groovyx.gpars.dataflow.DataflowQueue for queue channel
```
Also a input value channel is implicitly created by a process when an input specifies a simple
value (not a channel) in the `from` clause. Moreover, an output value channel is also implicitly
created for a process whose inputs are only value channels. The code below will show the cases:
```nextflow
// simple value as input to create input value channel, so can be combined with other multi-value channels as input for other processes.
input:
path fa from params.fasta
// this will create a queue channel instead, so processes using this channel as input will run only once.
input:
path fa from Channel.fromPath(params.fasta)
```
## Output
Only output block can be provided for each task, and it can contain one or more
declarations. The declaration follows the following format:
```
output:
<qualifier> <variable name> into <channel 1>[,<channel 2>, ...]
```
The qualifiers can be summarized as below:
qualifier | source type | return type | description
--- | --- | --- | ---
val | value element | value element | it sends value elements to output channel
file | file names | file object | sends file objects to output channel
tuple | tuple list | tuple list | it sends a tuple as element to output channel
One can use a file glob format such as 'chunk_*' to generate a set of files.
The glob matching has the following properties:
1. input files are never matched against.
2. it matches both files and directories.
3. if '**' is used instead of '*', it will cross over directory boundaries and only match files.
One can also put several files into a list, using the format like below (enclosed with braces):
```nextflow
output: path '{f1.txt,f2.txt}' into ch_out
```
## Variables
In nextflow, there are both pipeline variables and environment variables. The former
are the variables that are defined in the pipeline, such as *params*, *val x from ch_x*, and
the latter are defined in the system running the pipeline or in the script block,
such as variables for
bash, python, perl, etc. Depending on how the commands are quoted in the script block,
different methods are needed for access these variables:
* tri-double quotes: pipeline variable uses prefix '\$', such as '\$params.num', and environment
variables use escape '\\', such as '\\\$enVar'.
* tri-single quotes: pipeline variables are not accessible, and environment
variables use '\$' prefix, such as '\$enVar'.
* use 'shell' block instead of 'script': pipeline variable uses '!{params.num}', and environment
variables use '\$enVar'. Note that pipeline variable must be put in '{}' and the code block
must be quoted using single quotes.
Also if a pipeline variable is enclosed in single quotes when declared, then it will
still need a '\$' in script block, for example, the following two are equivalent:
```
params.txt=Channel.from("Hello","NextFlow")
params.txt1=Channel.from("Hello1","NextFlow1")
process quotes{
input:
val 'x' from params.txt
script:
"""
echo $x
"""
}
process noquotes{
input:
val x from params.txt1
script:
"""
echo $x
# this is a bash comment with unknown variable $y; error will be triggered
"""
}
```
Note that in the script block, any variable, even the one in bash comments, such
as in the last code block, need a definition, otherwise error is triggered.
Also there are predefined variables in nextflow, summarized as below
variableName | source | description
--- | --- | ---
it | channel | each item in a channel
## Operators on channels
operator | target | description
--- | --- | ---
view | channel | print out the content of a channel
flatten |
collect |
join |
map |
mix |
## process control
### if-else
The if-else structure follows the same style in C and Java.
```
params.p="hello"
process ifelse {
input:
val x from params.p
script:
if(x == 'hello')
"""
echo This is block 1 $x
"""
else if(x=="world")
"""
echo This is block 2 $x
"""
else
"""
echo This is a surprise $x
"""
}
```
It also has ternary expression like 'a? a > 10 : -1'.
### for-loop
It has two structures as below:
```
for( int i=0; i<3; i++)
{
println "here is $i"
}
// or using a list
names=['a','b','c']
for(String el : names)
{
println("my name is $el")
}
```
### when
Conditional execution
```
params.do = true
process {
output:
path 'out.txt' into out_ch
when:
params.do
// the following code is run only when params.do is true
script:
'''
echo "This is a conditional process" > out.txt
'''
}
```
Note that if a process is skipped, then its output is also skipped,
so one can't rely on the output channel.
## file staging
nextflow stages files so that they can be cached/reused in future runs.
However, if the staging process was interrupted and the file was not correctly
staged, it will not report errors in the following runs. Need be very careful.
This issue was discussed [here](https://github.com/nextflow-io/nextflow/issues/1552).
If a pipeline was interrupted forcibly, then one should delete the working directory
before resume/rerun the pipeline.
Note that for fromPath, the input can be a folder or file. If it is a folder,
the whole folder is staged. When providing inputs on command line, if the path
contains wildcard symbols such as '*' and '?', these paths need be quoted,
otherwise they will be expanded on the command line and only first element is
passed into program.
## functions
One can define functions in a script as follows:
```
int sum(int x, int y){
return(x+y)
}
```
One can also define closures as below:
```
double={it * 2} // define a closure to double input. it is the implict variable
double.call(4) // call the closure with argument 4
sum={x,y -> x+y} // two arguments
sum.call(5,6)
```
Some bultin functions:
function | purpose
--- | ---
template | load external script into a process. The script should exist in the templates folder, otherwise absolute path is needed.
log.info | print out strings, like println
string.stripIndent | remove indent spaces from a multi-line string
## nextflow running options
option | description
--- | ---
-resume | continue from where last run stopped
-process.echo | print out the result from script blocks
-with-docker | use the docker environment setup in the nextflow.config file
-with-conda | activate a conda environment by specifying an environment recipe file or its name
-with-report | create execution report
-with-trace | create file trace.txt containing running information for each task
-with-timeline | show the time used by each task.
-with-dag | render workflow using direct acyclic graph, needing Graphviz installed.
-w | the folder in which tasks are run, default is 'work'.
Note when specififying parameters, one need use double dashes, such as
*--greeting nextflow* to provide value for params.greeting.
## directives
Directives defines optional settings that affects the execution of current process,
without affecting the semantic of the task itself. It stays at the top of a process.
A complete list of directives is at https://www.nextflow.io/docs/latest/process.html#directives.
A summary of the major directives is as follows:
directive | example | explanation
--- | --- | ---
cpus | cpus 2 | claim the number of cpus to use per task. If this is 2, and the number of tasks running in parallel is 4, then the machine needs 8 cpus.
memory | memory '8.0 GB' | claim the amount of memory to use
disk | disk '2.0 GB' | disk amount required for the process.
tag | tag "$sampleId" | assign a label to a running task
echo | echo true | let task to print out to standard output
container | container 'image/name' | docker container to be used by the workflow.
publishDir | publishDir "/path/to/results", mode: 'copy' | send the results to directory, otherwise task-specific output files are deleted upon completion. The directory can be remote, too. One can use --outdir to change this folder. 'saveAs' use a closure to accept filename as string and set output filename.
errorStrategy | errorStrategy 'ignore' | how to handle errors occured in a process. Available options are: terminate (terminate the whole pipeline immediately), finish (terminate after submitted jobs completed), ignore (ignore the error), retry (rerun the errored process).
maxErrors | maxErrors 3 | the number of retries for the whole process across all tasks when errorStrategy is set to 'retry'.
maxRetries | maxRetries 2 | the number of retries for a task (different from maxErrors) when errorStrategy is set to 'retry'.
label | label 'big_mem' |
## operators
Operators are bultin functions that applied to channels and can be used to
transform, filter, fork, and combine channels. The full list of operators
is at https://www.nextflow.io/docs/latest/operator.html.
A summary of useful operators:
operator | example | description
--- | --- | ---
map | numCh.map(it -> it*it) | transform each element using a function, here to get the squares.
view | myCh.view() | print out the elements and each is appended a new line.
into | myCh.into {aCh; bCh} | copy the source channel to each target channel.
mix | myCh.mix(aCh,bCh) | merge all channels into one.
flatten | tupleCh.flatten() | tuples in a channel is flattened and each element is emitted separately.
collect | myCh.collect() | collect all elements into a list and emit as one; the opposite of flatten. Don't use this on a file path, which will lead to a list of items separated by path separator.
groupTuple | tupleCh.groupTuple() | group all tuples that have the same key into one tuple.
join | leftCh.join(rightCh) | like 'merge' in R, merge two channels into one based on key match.
branch | myCh.branch{small: it < 5; big: it >=5} | return a multi-channel object, with labels as keys.
set | Channel.from(1,2,34).set(numCh) | set channel name
cross | srcCh.cross(targetCh) | output only the source items whose keys (default: 1st item) have a match in the target channel. Like R's merge() function.
subscribe | ch.subscribe { println "Got: $it" } | allow user to define a function to run on each emit element.
collectFile | ch.collectFile(name:"outfile.txt") | store all emited elements into the specified file.
combine | ch1.combine(ch2) | combine two channels in the form of cartesian product, or based on a key given by option 'by: pos'.
concat | ch1.concat(ch2,ch3) | concatenate the elements in multiple channels into one.
## Groovy language
`r nfLink` is a DSL implementation on Groovy language, so it can run
any Groovy and Java code.
Below is a summary of the major operations by Groovy language:
### Comments
The same as C-style syntax, one can use
```
// single line comments
/* comments
block
*/
```
### Variables
There are global and local variables. Global variables are directly created by
assigning a value, such as
```
x=1
x=-3.14
x="Hello"
x=new java.util.Date()
```
While local variables are created using keyword *def* as
```
def localX="for a closure or function"
```
#### lists
In addition to simple variables, there are also lists. They are created by
putting elements in a square-bracket. The examples are below:
```
list=[0,1,2,3]
list[0] // get the first element
list.get(0) // the same thing
list.size() // get the list size
list.reverse() // reverse the list
list[-1] // last element
list.unique() // unique elements
list.min() // minimal element
list.max() // maximum
list.count(3) // count the element '3'
list.sort() // sort elements
list.flatten() // flatten out list elements to single-value elements
list.sum() // total
list.find{it % 2 == 0} // find first even elements
list.findAll{it % 2 == 0} // find all even elements
```
#### Maps
Maps are more like dictionaries in other languages. It has all methods implemented
in [java.util.Map](https://docs.oracle.com/javase/8/docs/api/java/util/Map.html)
and [Groovy API](http://docs.groovy-lang.org/latest/html/groovy-jdk/java/util/Map.html).
Below are some frequent operations:
```
map=[a:1, b:2, c:3] // define a map
map['a'] // get the element with key 'a'
map.a // the same as above
map.get('a') // the same as above
map.put('z', 100) // add a value with key 'z'
```
### Strings
Strings can be defined by enclosing them in single or double quotes. When
using double quotes, it can use '$' for including variables or any expression,
like '$var' or ${command/operation}.
Strings can also be defined using '/' as delimiter, such as '/here\tis\ta\tpattern/',
often used for defining regular expressions. These are called slashy strings.
Slashy strings can have multiple lines. Another way to have multi-line strings is
using triple single and double quotes. Note that slashy strings and double-quote
strings support variable interpolation.
### File operations
- Convert a string to a file object: myFile=file('path/to/file')
- Get a file content as text: myFile.text
- Get a file content as byte array for binary data: myFile.bytes
- Save binary data to a file: myFile.bytes = binaryBuffer
- Add new content to a file: myFile.append('line 1\nline 2\n')
- Read lines as an array of strings: myFile.readLines()
## NextFlow configuration
The documentation can be found here https://www.nextflow.io/docs/latest/config.html
### nextflow.config
This file hosts the settings for pipelines, and it may appear in multiple locations.
Here are the locations that a nextflow script searches for settings in the order of
decreasing priority (top ones override bottom ones):
1. values provided with command line as --option 'value'.
2. values set in file provided by -params-file.
3. values set in file provided by -c 'config-file'.
4. file `nextflow.config` in current folder.
5. file `nextflow.config` in the workflow project folder.
6. file $HOME/.nextflow/config.
7. values defined in pipeline script itself.
If one doesn't want to use default nextflow.config files mentioned above, but only user-specified
one, use the option '-C user.config' (note capitalized *C* is used here).
One config file can include other config files using the keyword *includeConfig*.
**Note** that in a config file, a value set later will override the value set earlier,
including the sections imported with the keyword *includeConfig* and defined in
*profiles*. Therefore, it is very important to make sure the order of the sections
match the need of priority expectation: put high-priority settings at later sections.
Also when specifying multiple profiles like '-profile a,b', then the profile defined later
has higher priority than the one defined earlier. So if profile 'a' is defined later, then
its setting has higher priority than 'b's.
### syntax
The syntax to define variales in config files are as follows:
> name = value
1. Note that value has type: characters should be quoted, while numbers and logical (true or false)
should not be quoted.
2. The variables defined in config file can be used in other variables in the format of $configVar
or ${configExp}.
3. The environment variables defined in host such as $PATH, $PWD, etc can also be used in config file.
4. The same comment syntax is used in config file, i.e., '//' or '/* comment */'.
5. variables can be in a scope by adding a prefix, such as 'params.x'. There are two ways to define
variables in a scope,
```
params.x="way 1"
params {
x="way 2"
y="extra"
}
```
Note that variables defined in the same scope can be used directly without prefix and override any variables
defined outside, for example:
```
params.X = "outsideX"
params {
X = "globalX"
vars {
X = "localX"
Y = "This $X is always 'localX', overriding globalX"
Yp = "This ${params.X} is affected by input parameters, default is 'globalX'"
}
}
// test with the following code
println ${params.vars.X}
println ${params.vars.Y}
println ${params.vars.Yp}
```
Major scopes/variables
scope | description
--- | ---
params | Define parameters used in script. The precedence for parameters are as follows: command line > nextflow.config > workflow script.
env | export variables to the environment where workflow tasks to execute.
process | set process directives, such as cpus, memory, and container.
### Important variables
variable | scope | example | description
--- | --- | --- | ---
executor | process | process.executor = "awsbatch" | set the deployment cluster
queue | process | process.queue = "myqueue" | set the queue in the cluster/environment
cpus | process | cpus = 4 | the number of cpus for a task execution
memory | process | memory = '10 GB' | the needed memory for a task execution
disk | process | disk = "100 GB" | the disk storage required by a task
workDir | N/A | workDir="s3://aws/data/dir" | the directory where tasks temporary files are created, one can use '-w /new/path' to set it on cmdline
region | aws | aws.region = "us-east-1" | the AWS region for awsbatch
batch.cliPath | aws | aws.batch.cliPath = '/home/ec2-user/miniconda/bin/aws' | path to 'aws' command
batch.volumes | aws | aws.batch.volumes = ['/tmp', '/host/path:/mnt/path:ro'] | attach host volumes to docker; the setting applies to all processes
baseDir | global | |The directory where the main workflow script is located (deprecated in favour of projectDir since 20.04.0).
launchDir | global | | The directory where the workflow is launched (requires version 20.04.0 or later).
moduleDir | global | | The directory where a module/process script is located for DSL2 modules or the same as projectDir for a non-module script (requires version 20.04.0 or later).
nextflow | global | | Dictionary like object representing nextflow runtime information (see Nextflow metadata).
params | global | | Dictionary like object holding workflow parameters specifing in the config file or as command line options.
projectDir | global | | The directory where the main script is located (requires version 20.04.0 or later).
workflow | global | | Dictionary like object representing workflow runtime information (see Runtime metadata).
One can use the following variables in nextflow.config to choose and set the environment for
running the workflow:
```
// using docker
process.container="image/name"
docker.enabled = true
// use singularity, can also use library://, shub://, docker://, docker-daemon:// protocols
process.container="path/to/file.sif"
singularity.enabled = true
// configue conda environment, provide an environment or YAML file to build environment
process.conda = "path/to/environment"
```
### Bulk-set attributes
One can use the following ways to set attributes (resources, containers, etc) for processes or pipelines:
1. "withName: procName {}": set attributes for processes by name. 'procName' can be regular expression
such as 'abc|def' (matching two processes) and '!bar' (not process 'bar').
2. "withLabel: 'procLabel' {}": set attributes for processes by label, also accept regular expression as above.
3. profiles: set execution profile (see below) for the whole pipeline.
A profile is a set of configuration attributes that can be activated/chosen when
launching a pipeline execution by using the -profile command line option.
An example is shown below:
```
profiles {
standard {
params.genome = '/local/path/ref.fasta'
process.executor = 'local'
}
cluster {
params.genome = '/data/stared/ref.fasta'
process.executor = 'sge'
process.queue = 'long'
process.memory = '10GB'
process.conda = '/some/path/env.yml'
}
cloud {
params.genome = '/data/stared/ref.fasta'
process.executor = 'awsbatch'
process.container = 'cbcrg/imagex'
docker.enabled = true
}
}
```
The *standard* profile is the default one, and others can be specified with the option '-profile'.
## Cloud deployment
### AWS batch
1. credentials
nextflow will find aws credentials in the following order:
(1) Looking for the following environment variables:
* AWS_ACCESS_KEY_ID
* AWS_SECRET_ACCESS_KEY
* AWS_DEFAULT_REGION
(2) read ~/.aws/credentials or ~/.aws/config for such information.
(3) or it can be configured in nextflow config file using variables 'aws.accessKey',
'aws.secretKey', and 'aws.region'; look at https://www.nextflow.io/docs/latest/config.html#config-aws
2. AWS-cli tools
Nextflow requires to access the AWS command line tool (aws) from the container
in which the job runs in order to stage the required input files and to copy
back the resulting output files in the S3 storage.
The aws tool can either be included in container image(s) used by your pipeline
execution or installed in a custom AMI that needs to used in place of the default
AMI when configuring the Batch Computing environment. For the latter, one can start
an EC2 instance and install awscli using conda, and create an AMI from it. Don't
use pip to install awscli, which may not work in container. Finally use
```
aws.batch.cliPath = '/home/ec2-user/miniconda/bin/aws'
```
to specify the path where aws is installed in the AMI.
3. Nextflow creates job definitions (tasks and containers) and jobs to run on aws batch,
but users need create computer environments and awsbatch queues to run the jobs.
Each process in the job can use different queue and docker images if necessary.
4. Docker images
The container image(s) must be published in a Docker registry that is accessible
from the instances run by AWS Batch eg. Docker Hub, Quay or AWS ECS Container Registry.
5. Container properties
When nextflow define a batch job, it sets container properties
(see [this page](https://docs.aws.amazon.com/batch/latest/APIReference/API_ContainerProperties.html)).
The properties include command to run and cpu/memory limits usable by a container.
nextflow may put a very low value for cpus and memory, leading to the container
killed when running a job task.
It is
always safer to specify the memory/cpu resource when using docker image in a nextflow
process.
6. Monitor jobs
When a job is submitted to an AWS batch queue, one can go to AWS batch queue console
to see the status of running jobs. In the log stream page, one can see the progress
of a job running. The 'container' section shows the command called and the resources
such as cpus and memory assigned.
## Monitoring workflow
One can monitor the running a pipeline/workflow using the *workflow* object,
such as
```
println "Project : $workflow.projectDir"
println "Git info: $workflow.repository - $workflow.revision [$workflow.commitId]
```
The table below lists the properties accessible from the *workflow* object:
Name | Description
--- | ---
scriptId | Project main script unique hash ID.
scriptName | Project main script file name.
scriptFile | Project main script file path.
repository | Project repository Git remote URL.
commitId | Git commit ID of the executed workflow repository.
revision | Git branch/tag of the executed workflow repository.
projectDir | Directory where the workflow project is stored in the computer.
launchDir | Directory where the workflow execution has been launched.
workDir | Workflow working directory.
homeDir | User system home directory.
userName | User system account name.
configFiles | Configuration files used for the workflow execution.
container | Docker image used to run workflow tasks. When more than one image is used it returns a map object containing [process name, image name] pair entries.
containerEngine | Returns the name of the container engine (e.g. docker or singularity) or null if no container engine is enabled.
commandLine | Command line as entered by the user to launch the workflow execution.
profile | Used configuration profile.
runName | Mnemonic name assigned to this execution instance.
sessionId | Unique identifier (UUID) associated to current execution.
resume | Returns true whenever the current instance is resumed from a previous execution.
start | Timestamp of workflow at execution start.
manifest | Entries of the workflow manifest.
✝ complete | Timestamp of workflow when execution is completed.
✝ duration | Time elapsed to complete workflow execution.
* success | Reports if the execution completed successfully.
* exitStatus | Exit status of the task that caused the workflow execution to fail.
* errorMessage | Error message of the task that caused the workflow execution to fail.
* errorReport | Detailed error of the task that caused the workflow execution to fail.
Also there are two handlers to handle the workflow:
Handler | Description
--- | ---
workflow.onComplete | what to do when the pipeline complete
workflow.onError | what to do when error occured
## DSL2
DSL2 is an extension of previous syntax, to enable it, one need put
`nextflow.enable.dsl=2` at the beginning of the workflow script.
The DSL2 has the following features:
### Functions
```
def <function name> ( arg1, arg, .. ) {
<function body>
}
```
Functions return the result of the last statement. To explicitly
return a result, use the keyword `return`.
Note that functions don't accept default arguments, so one has
to call a function with all arguments all the time. For example,
the following doesn't work:
```nextflow
def f(a, b=10) { a + b }
// you can't call
f(1)
// you need call
f(1, 20)
```
### Process
The new syntax is the same as the old one in defining process, and the only difference
is to omit the 'from' and 'into' channel declaration, and then the process will be invoked
in the workflow scope just like a function (so no invocation during the definition of process).
Note that a process component can be invoked
only once in the same workflow context.
An example is as follows:
```
nextflow.enable.dsl=2
process foo {
output:
path 'foo.txt'
script:
"""
your_command > foo.txt
"""
}
process bar {
input:
path x
output:
path 'bar.txt'
script:
"""
another_command $x > bar.txt
"""
}
workflow {
data = channel.fromPath('/some/path/*.txt')
foo()
bar(data)
}
```
In this way, one can use process composition like proc1(proc2()) as long as proc2's output
matches the requirement of proc1's input. The output a process can be reached like proc.out.
When a process defines two or more output channels, each of them can be accessed using the
array element operator e.g. out[0], out[1], etc. or using named outputs.
To name a output channel which can be used in external scope, one can use syntax
like `path '*.bam', emit: samples_bam`.
### Workflow
one can use the *workflow* keyword to define sub-workflow, which can be called in other
workflow definitions. See the below example:
```
nextflow.enable.dsl=2
// define process
process abc {
output:
val x
script:
x="Excellent nextflow"
}
process ddd {
input:
val x
output:
stdout
script:
"""
echo "Hello world: $x"
"""
}
// define sub-workflow component
workflow sub_flow {
word=abc()
res=ddd(word)
res.view()
}
// triger workflow
workflow {
sub_flow()
}
```
A workflow definition which does not declare any name is assumed to be the main workflow
and it’s implicitly executed. Therefore it’s the entry point of the workflow application.
An alternative workflow entry can be specified using the -entry command line option.
To refer to a process in a workflow component, one can use the format like 'sub_flow:abc',
here it refers to the 'abc' process in workflow 'sub_flow'. This reference can be used
in nextflow.config file to setup configurations for a specific process, especially when
one process is invoked in multiple workflows.
A workflow may contain the following 4 main segments: take, main, and emit. An
example as follows:
```nextflow
include { foo } from '../some/module.nf'
workflow {
// use take to list input channels
take:
x
y
// main contains the running processes/workflows
main:
foo(x,y)
// emit to set output channels
emit:
foo.out
res = foo.out // a named version
}
```
Note that when calling a workflow in another script/workflow, one can't assign it to
a variable and then use the variable to access the result. As an example:
```nextflow
process proc1 {
...
}
workflow wf1 {
...
}
workflow {
procRes=proc1() // valid for process
procRes.out.view()
wf1() // can't assign to a variable
wf1.out.view() // the only way to view workflow outputs
}
```
### Module
A module can contain the definition of a function, process and workflow definitions. And module
can be included or shared among workflow applications. Just like python modules. One example
of including a module is as below:
```nextflow
include { foo; bar } from './some/module'
workflow {
data = channel.fromPath('/some/data/*.txt')
foo(data)
}
```
Note all the module paths should start with './', '../', or '/'.
Including a module is equivalent to copy the relevant code from the module
into the current pipeline, unless some modifications made (see below). So
the included processes/workflows use current pipeline's configurations.
When including a process from a module file, all code outside processes
are also run, so channels and parameters defined outside processes are
run, but these channels/parameters are invisible in current workflow, and
so not usable. However, including a workflow has no such issue: the variables
defined outside included workflows are available in that workflow, so one
needn't worry about that.
When including a module component it’s possible to specify a name alias.
This allows the inclusion and the invocation of the same component multiple times
in your script using different names: this is essential if one wants to use
the same process multiple times in the same workflow.
```nextflow
include { foo } from './some/module'
include { foo as bar } from './other/module'
workflow {
foo(some_data)
bar(other_data)
}
```
Note that when including a module, by default, the parameters defined in current including
main script will override those with the same names in the included modules (assuming the
main script's parameters are defined before module inclusion, otherwise the module values
will be taken).
To avoid this issue, one can use the option *params* to specify one or more
parameters from the included module and set values, and these values are not
affected by the including script's environment, nor by command line inputs. And
for module-specific parameters that are not specified in *params*, they use
the values set in the module, not changeable by including script or command line
values.
Let's take the following module as an example:
```nextflow
nextflow.enable.dsl=2
// this parameter is also defined in including parameter too.
params.globalInput="global input in module"
// the following parameters are module-specific
params.moduleInput="module input"
params.moduleInputNoChange="module input no change"
workflow wf_module {
log.info """
In workflow wf_module:
globalInput: ${params.globalInput}
moduleInput: ${params.moduleInput}
moduleInputNoChange: ${params.moduleInputNoChange}
"""
}
```
And when one uses the following to include the module using the options,
we will see some interesting output:
```nextflow
//
nextflow.enable.dsl=2
// global parameter
params.globalInput="global input in main"
include { wf_module as wf1 } from "./wf_module.nf" params (
globalInput: "global input set in params",
moduleInput: "module input set in params"
)
workflow {
wf1()
// the main script has no access to the two module-specific parameters
// moduleInput and moduleInputNoChange, unless they are
// specified in command to implicitly added these two parameters
}
/* expected output (run without command params setting):
In workflow wf_module:
globalInput: global input set in params
moduleInput: module input set in params
moduleInputNoChange: module input no change // set by module
*/
```
Another way to set module-specific parameters is to use the option *addParams*.
Different from *params*, *addParams* exposes all parameters in the module,
even those not specified in *addParams*. All the parameters set in the included
module can be changed by the including script or command line, making it vulnerable
to parameter changes, because change in one place may change the value for all
processes/workflows using the parameter. Also these added module-specific parameters
are not accessible in the including script, too, unless the same-name parameters
are specified at the command line.
Let's have a look at an example. Using the above *wf_module* as included
module:
```
nextflow.enable.dsl=2
params.globalInput="global input in main"
// use addParams to add user-specific params
include { wf_module as wf2} from "./wf_module.nf" addParams (
globalInput: "global input added",
moduleInput: "module input added"
)
workflow {
wf2()
/* expected output without command line params setting
In workflow wf_module:
globalInput: global input in main // not affected by addParams because it was set before this.
moduleInput: module input added // set by addParams
moduleInputNoChange: module input no change // use the module value
*/
}
```
Note that the parameters set for an included process/workflow are
bound to that included process name, even a separate inclusion uses
a different parameter, below is an example:
```
nextflow.enable.dsl=2
include { print_msg as msg1 } from \
'../process/msg.nf' addParams (
msg: "workflow msg1"
)
include { print_msg as msg2 } from \
'../process/msg.nf' addParams (
msg: "workflow msg2"
)
workflow {
msg1() // output: workflow msg1
msg2() // output: workflow msg2
}
```
And the process `print_msg` is simply a process to print out
the parameter `msg`, and defined as below:
```
params.msg="This is default msg in msg.nf"
process print_msg1 {
echo true
script:
"""
echo ${params.msg}
"""
}
```
Also note that the command line parameter setting does not overrides that set by
addParams(), so if, for the above example, one provides '--msg "command line msg"'
on command line, then msg1() and msg2() will still print out old message.
Bug report: 08/01/2022, nextflow version 21.10.6 build 5660.
When the parameter names have uppercase letters, the statement
in the last paragraph isn't true, and all the paramters using 'addParams()'
will be overriden by global or commandline parameters. For example,
in the following setting, `params.msG` is always "message in main":
```
params.msG="message in main"
include { print_msg } from \
'process.nf' addParams (
msG: "setting a message which will be always overriden by main"
)
```
Update: 08/01/2022 This bug remains there in nextflow version 22.04.5
### Channel forking
Channel is automatically forked when it is consumed by multiple processes, so
no more need *into* operator.
### Pipes
One can use the pipe `|` and the *and* `&` operator in workflow to connect predefined processes
and functions. For example:
```
process foo {
input: val data
output: val result
exec:
result = "$data world"
}
process bar {
input: val data
output: val result
exec:
result = data.toUpperCase()
}
workflow {
channel.from('Hello') | map { it.reverse() } | (foo & bar) | mix | view
}
```
in the above snippet the channel emitting the Hello is piped with the map
which reverses the string value. Then, the result is passed to either
foo and bar processes which are executed in parallel. The result is pair of channels
whose content is merged into a single channel using the mix operator.
Finally the result is printed using the view operator.
## Tricks
1. Methods to convert a groovy map to string
- Map.inspect(): all strings are quoted, good for strings with spaces
- Map.toMapString(): convert to a string, similar to `Map.inspect()`, but strings are not quoted
- JsonOutput.toJson(myMap): convert to a json string. Note that any value
with '/' must be quoted as strings, otherwise JsonOutput
will reports the error 'Unexpected error [StackOverflowError]'.
## Caveats
1. Zero is treated as false in logical test.
```
a=0
if(! a) {
// this will be printed
println "0 is false in groovy"
}
```
2. When a file channel is created from a string containing wildcard symbols, the
order of the files in the channel is unknown, and may vary among operating systems.
Below is an example:
```
file_ch = Channel.fromPath('dataDir/{total,perc,meth}*.txt.gz').collect()
file_ch.view()
/* output in aws ubuntu instance, and this order may change in other systems
[meth_cov_matrix_CpG.txt.gz, perc_matrix_CpG.txt.gz, total_cov_matrix_CpG.txt.gz]
*/
```
To ensure the right file is used, one need find the filenames or emit the files
into individual channels.
3. If an input channel is empty, then the process won't be run at all, so the
whole pipeline will exit silently without error. One solution is that one
can provide a default value when a channel is empty, so that the process can
run and check the received value and respond accordingly.
4. The symbol '\' is a special variable in nextflow for escaping characters. If
one wants to use it in shell command, it has to be escaped by using double
backslashes '\\'; otherwise nextflow may interpret the script code in an
unexpected way. For example, in the following code, double backslashes are
used in front of '+' sign.
```
script:
"""
o="dmr_test.a_vs_b"
label=\$(echo \$o | sed -e 's/^dmr_[^.]\\+.//');
echo \$label
"""
```
5. There are two ways for one to write values into a file: (1) via Channel's collectFile()
operator, (2) via the methods of the Path object. The below are two examples:
```nextflow
// via Channel
myStr="hello world"
Channel.of(myStr).collectFile(name:"/path/to/outfile")
```
```
// via Path
myFile=file("/path/to/outfile")
myFile.text=myStr
```
Note that there are some key differences between the two ways:
- The *Path* operation needs the outdir already exists (unless
it is an AWS S3 path), otherwise it reports error. The *Channel*
operation has no this issue.
- If one file is operated by a Channel, this file seems not operable
by the *Path* operation in the same nextflow process.
## FAQs
1. How to feed a program with multiple input files?
A: Use ==path fs from Channel.fromPath("/path/*.fa").collect()==, where `fs` will contain all the files.
2. How to merge all output files into one file?
A: one can use collectFile() operator to collect all the values emitted from a channel.
3. How to run a process in parallel?
A: one can check an FAQ at https://www.nextflow.io/docs/latest/faq.html.
Basically, one need create a channel to hold all input values,
and then call this channel as input in a process, which will trigger
the process on each of the input values.
4. Nextflow pipeline hangs without completion, why?
A. Possible reasons: (1) a channel is explicitly created using Channel.create(),
and it needs be closed using ch.close(), otherwise the pipeline will not stop.
5. The cached files are not updated even though input files have changed. Why?
A. This can happen when input files are localed at AWS S3 and the executor is
local and workdir is also in a local directory. In this case, even though
files in the remote S3 folders have changed, the pipeline may not update local
cache when re-run the pipeline, no matter -resume is used or not.
6. A nextflow pipeline may not re-run and report error like [UnsupportedOperationException]. Why?
A. This is more likely caused by existing files in S3 and the pipeline tries to rename it,
but this renaming/moving operation in S3 is not supported by nextflow. One well known issue
is the trace.txt file. One solution is that add "overwrite=true" to the 'trace' scope; or
alternatively change trace dir to a different folder when re-running a pipeline. The same
issue may exist for report, timeline, and dag; if it is necessary, add "overwrite=true"
to those scopes, too.
7. Can I stop a pipeline run by 'Ctrl-C'?
A: Yes, you can, especially when the run hangs. But be prepared that some weird issues
may pop up when you resume the run later (with option -resume). Nextflow caches results
and reuse them when a run is resumed. However, if a file transfer (staging) or writing is interrupted
by a user, the resumed run may pick up these truncated or missing (if applicable) files,
yielding wrong results or causing a breakdown. Therefore, ideally one should let a pipeline
to finish started tasks before exiting when errors happened, rather than force stopping
the run by 'Ctrl-C'. If weird things happen, try to run pipelines without '-resume', which
may fix the issues.
8. Can I set dynamic label for a process?
A: If the variables used in dynamic label come from the inputs in the process
or the attributes of `task`, the answer is NO. However, if the variable is defined
in the scope outside the process, then it is YES. See the discussion
[here](https://github.com/nextflow-io/nextflow/issues/894).
Actually, all process directives can accept dynamic values (thus input variables)
except the following 3 directives: executor, label, and maxForks, so only these
3 directives need variables defined outside process scope. One can find more about
dynamic directives [here](https://www.nextflow.io/docs/latest/process.html#dynamic-directives).
9. How to get store the size of a channel into a variable?
A: This is actually not doable in nextflow due to the static nature of groovy language.
To get the number of files in a channel, and assume the string `params.infiles` contains
the filename string (with wildcards), one can do the following:
```nextflow
numFiles = file(params.infiles).size()
// then numFiles can be used as a variable in comparison
if(numFiles > 100) {
println "Too many files"
} else {
println "Just right"
}
```
10. How to run a pipeline under a different AWS account?
A: First, one need to setup two files `~/.aws/credentials` and
`~/.aws/config` by following the [instruction](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html).
Say, the aws account has a profile name 'User1', then one can run
nextflow under this account by setting environment variable
`AWS_PROFILE=User1`. For example:
```bash
AWS_PROFILE=User1 nextflow run /path/to/pipeline
```
More details can be found at [nextflow page](https://www.nextflow.io/docs/latest/awscloud.html).
11. How to activate docker to let processes run in container?
A: The directive `process.container` specifies the docker container(s) that
each process is run. However, this container won't be used unless certain
conditions are met.
For AWS batch, by default, all processes will be run in a container, so
no more settings are needed.
For a local run, one need use the command option `-with-docker` to activate the
running of processes in the specified containers. Alternatively, one
can set `docker.enabled = true` in nextflow.config to activate this option. However,
the former allows one to specify a different docker image other than that set
in nextflow.config.
One can also use the command option `-without-docker` to disable running
processes in containers.
12. How to fix the error: 'java.nio.file.AccessDeniedException'?
A: This error is caused by the no permission to access files/folders created by
docker containers. To fix this, one need do one of the following two settings in
nextflow.config, so that the created files have the right ownership:
```
// set docker run option; use single quotes
docker.runOptions='-u $(id -u):$(id -g)'
// alternatively, one can just set the following
docker.fixOwnership = true
```
Also see this [link](https://github.com/nf-core/tools/issues/336).
13. How to fix error: "Unknown method invocation `collect` on GroupTupleOp type"?
A: This error can be caused by a wrong input type to the operator
*groupTuple()*. One situation is the output from the operator
*splitCsv()*. To solve this issue, one can explicitly transform
each input element into a list, as shown in the below example:
```
Channel
.fromPath("test.tsv")
.splitCsv(header: false, skip: 1, sep: "\t")
.map{row -> [row[0], row[1], row[2]] } // convert row into list to avoid the error
.groupTuple(by: 0)
.view()
```
14. If there is a program named 'exe' in both container and the 'bin' folder, which will be
used by the processes?
A: If there is the same-named program in both container and nextflow project's 'bin'
folder, then the one in the 'bin' folder will be called.
15. What caused the following error:
'java.lang.UnsupportedOperationException: null
at com.upplication.s3fs.S3FileSystemProvider.getFileAttributeView(S3FileSystemProvider.java:697)'
A: This error occurred when I ran the nxf-sesame pipeline via awsbatch and both
workdir and outdir are in AWS S3. It turned out that this looks like a bug
in nextflow version 22.04.5. After I changed to use the version 22.09.5-edge,
the problem is gone.
So in general, when errors like "UnsupportedOperationException" happens, in
addition to check the pipeline code, one may also try different nextflow
versions for debugging.
16. How to accept a groovy variable (e.g., list, map, string) in a nextflow process?
A: one can pass a groovy variable to a process, and this process needs to handle
this variable in groovy space before using it in bash script. Here, we use
a *Map* object as an example.
```
// create a map
myMap=[a: 1, b:100, c: "a string"]
// create the process
process display_map {
debug true
input:
val info
script:
println info.getClass()
//set=info.entrySet()
// create a string which can be used in bash
bashCmd=""
info.each { entry ->
bashCmd += "$entry.key --> $entry.value\n"
}
"""
echo "$bashCmd" # print the string, if it is a command, one can run it directly
"""
}
// start the workflow
workflow {
display_map(myMap)
}
```
## Resources
1. Nextflow documentation: https://www.nextflow.io/docs/latest
2. AWS batch deployment: https://www.nextflow.io/docs/latest/awscloud.html#aws-batch
3. AWS batch permission configuration: https://apeltzer.github.io/post/01-aws-nfcore/
4. Nextflow training: https://codata-rda-advanced-bioinformatics-2019.readthedocs.io/en/latest/4.Day4.html
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment