Skip to content

Instantly share code, notes, and snippets.

@elowy01
Last active November 10, 2023 11:57
Show Gist options
  • Save elowy01/e9995d7ee8d6305930f868a10aeabbe9 to your computer and use it in GitHub Desktop.
Save elowy01/e9995d7ee8d6305930f868a10aeabbe9 to your computer and use it in GitHub Desktop.
nextflow cheat sheet
#Example 1:
#!/usr/bin/env nextflow
params.str = 'Hello world!'
process AFcalc {
"""
echo '${params.str}'
"""
}
//this is necessary to print the output
result.subscribe {
println it.trim()
}
#If we run this script by doing:
nextflow run decorate.nf
//
# Propagating parameters
#!/usr/bin/env nextflow
params.query = "test.py"
query = file(params.query)
process do_cat {
input:
file query
output:
file out_cat
"""
cat $query > out_cat
"""
}
process do_another_thing {
input:
file out_cat
"""
cat $out_cat
"""
}
result.subscribe {
println it.trim()
}
#This script will print out:
Hello world!
#If now we run it by doing:
./bin/nextflow run decorate.nf --str 'Hola mundo'
We are assigning a value for the --str param from the command line
//
#In the second example, we execute a python script that just prints out hello:
#!/usr/bin/env nextflow
params.script = "test.py"
script = file(params.script)
process AFcalc {
input:
file script
"""
python $script
"""
}
result.subscribe {
println it.trim()
}
//
### Configuration files:
Write a file named 'nextflow.config' and put it in the dir where you will run nextflow
#comments in the config file are writen as '//' or '/* .. */' to comment a block on multiple lines
//
#Ex1:
#Passing a param to the script directive,
#write the following to 'nextflow.config'
params.script='test.py'
#Then in the file with the analyses we write the following:
#!/usr/bin/env nextflow
script = file(params.script)
process AFcalc {
input:
file script
"""
python $script
"""
}
result.subscribe {
println it.trim()
}
//
/ We named the channel to 'num'
num = Channel.from( 'test1', 'test2', 'test3' )
process basicExample {
input:
val x from num
"echo process job $x"
}
result.subscribe {
println it.trim()
}
/This will print:
[e5/20785b] Submitted process > basicExample (1)
[15/cd4308] Submitted process > basicExample (3)
[71/b0abbe] Submitted process > basicExample (2)
process job test1
process job test3
process job test2
/
#if we want to print the contents of a channel we just need to do:
num = Channel.from( 'test1', 'test2', 'test3' )
num.println()
/
When the val has the same name as the channel from where the data is received, the from part can be omitted.
num = Channel.from( 1, 2, 3 )
process basicExample {
input:
val num
"echo process job $num"
}
/
#example of channel factory
zeroToNine = Channel.from( 0..9 )
process test {
input:
val x from zeroToNine
"""
echo $x
"""
}
/
#the same than above but creating the channel factory from a list:
myList = [1,2,3,4]
zeroToNine = Channel.from( myList )
process test {
input:
val x from zeroToNine
"""
echo $x
"""
}
/
#Now, let's create a factory channel from a string composed of several
#comma-separated elements:
chr_str="chr1,chr2,chr3"
chrList = Channel.from( chr_str.split(',') )
process test {
input:
val x from chrList
"""
echo $x
"""
}
//
/ Using a channel to read all files and operate on each particular file
/ In this case, we count the lines for each particular file
files = Channel.fromPath( './*.py' )
process countLines {
input:
file query_file from files
"wc -l ${query_file}"
}
result.subscribe {
println it.trim()
}
/ This will print:
[a1/2d4a9a] Submitted process > countLines (2)
[99/124b71] Submitted process > countLines (1)
1 test.py
1 test1.py
//
# Pass the parameters from one process to the other
#!/usr/bin/env nextflow
params.query = "test.py"
query = file(params.query)
process do_cat {
input:
file query
output:
file out_cat
"""
cat $query > out_cat
"""
}
process do_another_thing {
input:
file out_cat
"""
cat $out_cat
"""
}
result.subscribe {
println it.trim()
}
//
#Create a an output channel an write the output to a file named
#'result.txt' that will be put to a folder inside the 'work' folder
#created by nextflow
process randomNum {
output:
file 'result.txt' into numbers
'''
echo $RANDOM > result.txt
'''
}
numbers.subscribe { println "Received: " + it.text }
//
# creating an output file from parameters used with the script
params.outdir='17_09_2018/out'
out_annot = file(params.outdir + "/annotation.txt")
process randomNum {
"""
echo "hello" > ${out_annot}
"""
}
result.subscribe {
println it.trim()
}
//
#Create an output filename from params and pass the output file to a different process
params.outdir='17_09_2018/out'
out_annot = file(params.outdir + "/annotation.txt")
process randomNum {
"""
echo "hello" > ${out_annot}
"""
}
process printFile {
input:
file out_annot
"""
cat ${out_annot}
"""
}
result.subscribe {
println it.trim()
}
//
# Write some string and variable to the log
out_Annotate=file(params.outdir+"/annot_tab2."+params.region+".txt")
log.info "Hello: ${out_Annotate}"
process Annotate {
"""
python ${params.scripts_dir}/annotate.py --AFcalc ${params.scripts_dir} --phased_vcf ${params.phased_vcf} --sample_panel ${params.sample_panel} --tabix ${params.tabix} --region ${params.region} --pops ${params.pops} --exome ${params.exome} --outdir ${params.outdir} --ann_vcf ${params.ann_vcf}
"""
}
//
# printing a string (Groovy syntax)
println "Hello"
//
# replacing characters in a string
println "aaaaa".replace("a", "b")
//
# replace in action:
cutoff_values=[0.95,0.96]
process test1 {
input:
each cutoff from cutoff_values
output:
file(output_cutoff)
script:
output_cutoff="${cutoff}".replace('.', '_')
"""
touch "${output_cutoff}"
"""
}
# It will create 2 files named 0_95 and 0_96
#2nd example:
cutoff_values=[0.95,0.96]
process test1 {
input:
each cutoff from cutoff_values
output:
file(output_cutoff)
script:
output_cutoff="${cutoff}".replace('.', '_')+".vcf"
"""
touch "${output_cutoff}"
"""
}
# It will create 2 files named 0_95.vcf and 0_96.vcf
//
# running nexflow
#
#cleaning a project
./bin/nextflow clean -f
//
# Executor
#In order to run all processes with lsf, send to a specific queue and requesting a certain cpu and memory, add the following
#to nexflow.config
process {
executor='lsf'
queue='production-rh7'
cpus=1
memory=1.GB
}
/
# Running specific processes using a certain executor:
process runFastqSimpleQA {
/*
An example process
*/
memory '2 GB'
executor 'lsf'
cpus 2
queue 'standard1'
"""
echo "Hello"
"""
}
/
# Factory from file
# 'runs.txt' is splitted in 2 lines chunks and each of the
# chunks is processed by foo. In this factory, each of the lines in the file
# is considered a file and this is why the code in the 'script' part is considered
# a string
Channel
.fromPath('runs.txt')
.splitText(by: 2)
.set{ chunks_ch }
process foo {
echo true
input:
file x from chunks_ch
script:
"""
rev $x | rev
"""
}
/
#Factory from file ,in this case a list of run ids. Each line
#is considered a string
params.index = 'runs.txt'
Channel
.fromPath(params.index)
.splitCsv(header:true)
.map{ row-> row.runId }
.set { runs_ch }
process foo {
input:
val x from runs_ch
script:
"""
echo $x
"""
}
result.subscribe {
println it.trim()
}
//
#propagate a file and a value and pass them to another process
#that will modify the file name
process createFile {
output:
file 'test.txt' into output_f
val 'runid' into output_v
script:
"""
touch test.txt
"""
}
process mvFile {
input:
file output_f
val output_v
"""
mv ${output_f} ${output_v}
"""
}
//
# this factory reads a .csv file having different columns and will create
# a variable for each column that can be used by the process
params.index = 'runs.txt'
Channel
.fromPath(params.index)
.splitCsv(header:true)
.map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }
.set { samples_ch }
process foo {
input:
set sampleId, file(read1), file(read2) from samples_ch
script:
"""
echo your_command --sample $sampleId --reads $read1 $read2
"""
}
result.subscribe {
println it.trim()
}
//
# This workflow is interesting because it propagates
# a file and a value and the value is used by mvFile process
# to name a new file and put this new file into 'results' folder
process createFile {
output:
file 'test.txt' into output_f
val 'runid' into output_v
script:
"""
touch test.txt
"""
}
process mvFile {
publishDir 'results', saveAs:{ filename -> "$filename" }
input:
file output_f
val output_v
output:
file "${output_v}.test1.txt"
"""
mv ${output_f} ${output_v}.test1.txt
"""
}
//
#Saving all files produced in a process to a certain dir:
# This is the difference with respect to publishDir in combination with saveAs,
# which allow to save some of the files
process foo {
publishDir 'out/'
output:
file 'chunk_*' into letters
'''
printf 'Hola' | split -b 1 - chunk_
'''
}
//
#Saving all files produced in a process to a certain folder without creating the symbolic link
publishDir "result", mode: 'copy', overwrite: true
//
# Use nextflow with Docker
#First, put the following in your nextflow.config:
process.container = 'variant_filtering:latest' # this is the name of the image
docker.enabled = true
docker.runOptions = '--volume $HOME:$HOME --workdir $PWD' # Mount the files within the container
#Then, you run your nextflow workflow as usual:
nextflow -C test.config run test.nf --vcf input10.reheaded.vcf.gz
#Where input10.reheaded.vcf.gz will in your local system
//
# Conditional process (downloaded from https://github.com/nextflow-io/patterns/blob/master/conditional-process.nf)
#!/usr/bin/env nextflow
/*
* Copyright (c) 2018, Centre for Genomic Regulation (CRG).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
/*
* author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
params.flag = false
process foo {
output:
file 'x.txt' into foo_ch
when:
!params.flag
script:
'''
echo foo > x.txt
'''
}
process bar {
output:
file 'x.txt' into bar_ch
when:
params.flag
script:
'''
echo bar > x.txt
'''
}
process omega {
echo true
input:
file x from foo_ch.mix(bar_ch)
script:
"""
cat $x
"""
}
# This workflow will execute bar if params.flag=true and foo if params.flag=false
So if you run:
nextflow run conditional_process.nf
You get:
Launching `conditional_process.nf` [shrivelled_lalande] - revision: fab3a727cf
[warm up] executor > local
[6a/87f036] Submitted process > foo
[fb/647063] Submitted process > omega (1)
But if you run:
nextflow run conditional_process.nf --flag
You get:
N E X T F L O W ~ version 18.10.1
Launching `conditional_process.nf` [awesome_sax] - revision: fab3a727cf
[warm up] executor > local
[6c/edf567] Submitted process > bar
[0e/4b6f7d] Submitted process > omega (1)
bar
//
# Nextflow environment variables:
NXF_WORK # to set the work dir
NXF_HOME # to set Nextflow home
NXF_TEMP # to set Nextflow tmp dir
//
#Maximum number of processes to parallelize
process doNotParallelizeIt {
maxForks 1
'''
<your script here>
'''
}
By default this value is equals to the number of CPU cores available minus 1.
//
#User the errorStrategy directive if you want to instruct nf what to do in case of an error
memory '2 GB'
executor 'lsf'
errorStrategy 'ignore'
//
#setting the workind directory from the command line
nextflow -C test.config run test.nf -w /path/to/workingdir
//
#mix operator to mix two channels:
chr_str="chr1,chr2,chr3"
letter_str="a,b,c"
letterList = Channel.from( letter_str.split(','))
chrList = Channel.from( chr_str.split(',') )
process test {
echo true
input:
val x from chrList.mix(letterList)
"""
echo $x
"""
}
# This command will print out:
chr3
chr1
a
b
c
chr2
//
# Combine 2 channels (cartesian product)
chr_str="chr1,chr2,chr3"
letter_str="a,b,c"
letterList = Channel.from( letter_str.split(','))
chrList = Channel.from( chr_str.split(',') )
process test {
echo true
input:
set x,y from chrList.combine(letterList)
"""
echo $x $y
"""
}
# Will print out:
chr1 b
chr2 c
chr2 b
chr3 c
chr3 a
chr1 a
chr3 b
chr1 c
chr2 a
//
# How do I use the same channel multiple times:
vegetable_datasets = Channel.fromPath(params.input)
vegetable_datasets.into { datasets_clustalw; datasets_tcoffee }
process clustalw2_align {
input:
file vegetable_fasta from datasets_clustalw
output:
file "${vegetable_fasta.baseName}.aln" into clustalw_alns
script:
"""
clustalw2 -INFILE=${vegetable_fasta}
"""
}
process tcoffee_align {
input:
file vegetable_fasta from datasets_tcoffee
output:
file "${vegetable_fasta.baseName}.aln" into tcoffee_alns
script:
"""
t_coffee ${vegetable_fasta}
"""
}
//
#tag
#The tag directive allows you to associate each process executions with a custom label, so that it will be easier to
identify them in the log file or in the trace execution report. For example:
process foo {
tag { code }
input:
val code from 'alpha', 'gamma', 'omega'
"""
echo $code
"""
}
And the log will have:
[6e/28919b] Submitted process > foo (alpha)
[d2/1c6175] Submitted process > foo (gamma)
[1c/3ef220] Submitted process > foo (omega)
/
#tag mixing variable and text:
tag "FASTQC on $sample_id"
//
#duplicating output to two channels within a process:
process get_variant_annotations {
/*
Process to get the variant annotations for training files
and for VCF file to annotate (for a single chromosome in this case)
*/
memory '2 GB'
executor 'local'
queue "${params.queue}"
cpus 1
input:
file tp_vcf
file fp_vcf
output:
file 'TP_annotations.tsv.gz' into tp_annotations_train, tp_annotations_rfe
file 'FP_annotations.tsv.gz' into fp_annotations_train, fp_annotations_rfe
"""
bcftools query -H -f '${params.annotations}' ${tp_vcf} | bgzip -c > TP_annotations.tsv.gz
bcftools query -H -f '${params.annotations}' ${fp_vcf} | bgzip -c > FP_annotations.tsv.gz
"""
}
//
# Initializing an empty channel:
chrs_splitmultiallelic_withchr=Channel.empty()
if (params.region) {
chrList = Channel.from( params.region.split(',') )
chrList.into { chrs_splitmultiallelic_withchr ; chrs_intersecionCallSets; chrs_trainModel; chrs_rfe}
}
# The chrs_splitmultiallelic_withchr channel needs to be initialized out of the if scope
//
##groovy syntax:
/
#print out 'hello' to STDOUT
process test {
script:
println("hello")
"""
echo "hello"
"""
}
//
# defining and initializing a variable and printing to STDOUT the contents
process test {
def replace="hello"
println(replace)
script:
"""
echo "hello"
"""
}
/
#conditional if statement:
# exec is used to execute Nextflow code
process test {
exec:
def replace="hello"
if (replace=="hello") {
println(replace)
}
}
/
#scope of variables:
process test {
exec:
def replace="hello"
if (replace=="hello") {
def bye="bye"
}
println(bye)
}
#Trying to print bye out of scope will throw an error:ERROR ~ Error executing process > 'test'
Caused by:
Unknown variable 'bye' -- Make sure you didn't misspell it or define somewhere in the script before use it
#In order to fix this one needs to do:
process test {
exec:
def replace="hello"
def bye #or def bye=null is also valid
if (replace=="hello") {
bye="bye"
}
println(bye)
}
//
#conditional execution based on a param:
process test {
script:
if (params.flag) {
println("hello")
} else {
println("bye")
}
"""
echo "hello"
"""
}
# if it is run by nextflow run test.nf --flag true then it will print out "hello"
//
#creating val inside process that will outputted to channel
process test1 {
output:
val outest into outtest
exec:
outest="hello"
}
outtest.println()
# This will print:
hello
//
process test1 {
output:
val outest into outtest
outest="hello"
script:
"""
echo "hello"
"""
}
outtest.println()
# This will crash with:
ERROR ~ Error executing process > 'test1'
Caused by:
Missing value declared as output parameter: outest
#because outest is not initialized within the script/exec block.
#This will be fixed by doing:
process test1 {
output:
val outest into outtest
script:
outest="hello"
"""
echo "hello"
"""
}
outtest.println()
//
////// Check input parameters //////
if (!params.genome) {
exit 1, "Please specify a genome file"
}
//
#getting the basenames from files:
iles = Channel.fromPath( './*.gz' )
process printFiles {
input:
file query_file from files
"echo ${query_file.baseName}"
}
result.subscribe {
println it.trim()
}
//
#with -q we cancel all the messages printed by nextflow at the beginning
nextflow -q run.nf
//
#concat operator
c1 = Channel.from( 1,2,3 )
c2 = Channel.from( 'a','b' )
c3 = Channel.from( 'z' )
c1.concat(c2).concat(c3).println()
# print out in order, so it will print:
1
2
3
a
b
z
//
# I've got the following error after running Nexflow:
Failed to write core dump. Core dumps have been disabled
This seems to be fixed by entering the following command:
ulimit -c unlimited
//
Restarting an older run that has failed:
> nextflow log
TIMESTAMP DURATION RUN NAME STATUS REVISION ID SESSION ID COMMAND
2017-11-24 18:41:34 672ms ecstatic_noether OK bab98280bf 7a8fefda-c812-4842-9248-2fd1b8d1d1e1 nextflow run <your pipeline>
2017-11-29 10:55:15 2.5s grave_lavoisier OK 6a1acf3211 56c9a1a1-ad16-4671-b98a-96adbd5051f2 nextflow run <your pipeline>
2017-11-29 10:55:29 - golden_roentgen - 6a1acf3211 6b12ae11-74d8-4395-9685-4bb91e05e324 nextflow run <your pipeline>
2017-11-29 09:57:37 6.2s silly_austin OK 6a1acf3211 a896b4da-4530-48e5-a519-39016adff6fb nextflow run <your pipeline>
Check the ones that have STATUS=ERR and then get the RUN NAME, then you can resume that particular run by doing:
nextflow run <your pipeline> -resume grave_lavoisier
/
#Checking working folders for proceses in a session:
> nextflow log
TIMESTAMP DURATION RUN NAME STATUS REVISION ID SESSION ID COMMAND
2017-11-24 18:41:34 672ms ecstatic_noether OK bab98280bf 7a8fefda-c812-4842-9248-2fd1b8d1d1e1 nextflow run <your pipeline>
2017-11-29 10:55:15 2.5s grave_lavoisier OK 6a1acf3211 56c9a1a1-ad16-4671-b98a-96adbd5051f2 nextflow run <your pipeline>
2017-11-29 10:55:29 - golden_roentgen - 6a1acf3211 6b12ae11-74d8-4395-9685-4bb91e05e324 nextflow run <your pipeline>
2017-11-29 09:57:37 6.2s silly_austin OK 6a1acf3211 a896b4da-4530-48e5-a519-39016adff6fb nextflow run <your pipeline>
> nextflow silly_austin
//
/ #Combining 2 channels:
#!/usr/bin/env nextflow
echo true
samples = Channel.from '10000', '20000', '30000'
chrs = Channel.from 'chr1', 'chr2'
all = samples.spread(chrs)
all.println()
# This will produce:
[10000, chr1]
[10000, chr2]
[20000, chr1]
[20000, chr2]
[30000, chr1]
[30000, chr2]
//
# Executor in a config file is used to set optional config settings for a given executor. For example, for 'lsf' put this in a config file
that will be run in the command line using -C:
executor {
name = 'lsf'
queueSize= 500
}
'queueSize' is used to set the limit of jobs submitted to cluster
//
# checking the progress of nextflow processes progress:
1) Enter bjobs -w # examine the EXEC_HOST of the process you want to check
2) ssh exec_host
3) Enter 'ps ax |grep ernesto'
4) Locat the workdir where nextflow is running the process
# Constructing a file path with a param:
include { ALLELIC_PRIMITIVES; RUN_VT_UNIQ } from "${params.NF_MODULES}/processes/normalization.nf"
//
# Check files attributes
https://www.nextflow.io/docs/latest/script.html#check-file-attributes
//
# Optional publishdir
params.publish = false
process test_something {
publishDir "test/", enabled: params.publish
input:
val x from chan
echo true
output:
file("*") into chen
script:
if (params.publish) {
println "publishing"
}
"""
echo $x > ${x}.txt
"""
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment