Skip to content

Instantly share code, notes, and snippets.

@abhi18av
Forked from rpetit3/main.nf
Created July 9, 2021 13:48
Show Gist options
  • Save abhi18av/a66847f9c55990381a7b2386826234e8 to your computer and use it in GitHub Desktop.
Save abhi18av/a66847f9c55990381a7b2386826234e8 to your computer and use it in GitHub Desktop.
bactopia-wrapper-nextflow
#! /usr/bin/env nextflow
import groovy.json.JsonSlurper
import groovy.text.SimpleTemplateEngine
import groovy.util.FileNameByRegexFinder
import java.nio.file.Path
import java.nio.file.Paths
import nextflow.util.SysHelper
PROGRAM_NAME = workflow.manifest.name
VERSION = workflow.manifest.version
// Setup output directories
datasets = params.datasets ? params.datasets : '/home/ec2-user/datasets'
bactopia = params.bactopia ? params.bactopia : '/home/ec2-user/bactopia'
conda = params.conda ? params.conda : '/home/ec2-user/bactopia-envs'
outdir = params.outdir ? params.outdir : './'
SERVERS = ['olaf', 'loma']
// Setup some defaults
log.info "${PROGRAM_NAME} - ${VERSION}"
process bactopia {
/* Gather up input FASTQs for analysis. */
publishDir "${outdir}/${server}/complete/", mode: "${params.publish_mode}", overwrite: params.overwrite, pattern: "${accession}-complete.tar.gz"
publishDir "${outdir}/${server}/incomplete/", mode: "${params.publish_mode}", overwrite: params.overwrite, pattern: "${accession}-incomplete.tar.gz"
tag "${accession}"
input:
set val(accession), val(server) from create_input_channel()
output:
file "${accession}-incomplete.tar.gz" optional true
file "${accession}-complete.tar.gz" optional true
shell:
allow_failed = false
if (task.attempt == 3) {
allow_failed = true
}
opts = "${bactopia}/main.nf -c ${bactopia}/nextflow.config --accession ${accession} --datasets ${datasets} --condadir ${conda} --species 'Staphylococcus aureus' --genome_size median -qs ${task.cpus}"
"""
mkdir !{accession}-run
cd !{accession}-run
if nextflow run !{opts}; then
# Everything finished, pack up the results and clean up
mkdir complete/
mv bactopia-info/ !{accession}/ complete/
tar -cf - complete/ | pigz -n --best -p !{task.cpus} > !{accession}-complete.tar.gz
mv !{accession}-complete.tar.gz ../
else
# Run failed
if [[ "!{allow_failed}" == 'true' ]]; then
# We've tried enough times, pack it up
mkdir incomplete
mv .nextflow* work/* bactopia-info/ !{accession}/ incomplete/
tar -cf - incomplete/ | pigz -n --best -p !{task.cpus} > !{accession}-incomplete.tar.gz
mv !{accession}-incomplete.tar.gz ../
else
rm -rf !{accession}-run
exit 1
fi
fi
cd ..
rm -rf !{accession}-run
"""
}
workflow.onComplete {
workDir = new File("${workflow.workDir}")
workDirSize = toHumanString(workDir.directorySize())
println """
Bactopia Execution Summary
---------------------------
Command Line : ${workflow.commandLine}
Resumed : ${workflow.resume}
Completed At : ${workflow.complete}
Duration : ${workflow.duration}
Success : ${workflow.success}
Exit Code : ${workflow.exitStatus}
Error Report : ${workflow.errorReport ?: '-'}
Launch Dir : ${workflow.launchDir}
Working Dir : ${workflow.workDir} (Total Size: ${workDirSize})
Working Dir Size: ${workDirSize}
"""
}
// Utility functions
def toHumanString(bytes) {
// Thanks Niklaus
// https://gist.github.com/nikbucher/9687112
base = 1024L
decimals = 3
prefix = ['', 'K', 'M', 'G', 'T']
int i = Math.log(bytes)/Math.log(base) as Integer
i = (i >= prefix.size() ? prefix.size()-1 : i)
return Math.round((bytes / base**i) * 10**decimals) / 10**decimals + prefix[i]
}
def print_version() {
println(PROGRAM_NAME + ' ' + VERSION)
exit 0
}
def process_accessions(accession) {
/* Parse line and determine if single end or paired reads*/
if (accession.length() > 0) {
Random r = new Random()
server = SERVERS[r.nextInt(SERVERS.size)]
return tuple(accession, server)
}
}
def create_input_channel() {
if (params.accession) {
return process_accessions(params.accession)
} else {
return Channel.fromPath( file(params.accessions) )
.splitText()
.map { line -> process_accessions(line.trim()) }
}
}
def print_usage() {
usage_text = params.help_all ? full_help() : basic_help()
log.info"""
${PROGRAM_NAME} v${VERSION}
${basic_help()}
""".stripIndent()
if (params.conda_help) {
// Cleanup up the directory
// This is only meant to be used with tests for conda build
file("./work/").deleteDir()
file("./.nextflow/").deleteDir()
def files = new FileNameByRegexFinder().getFileNames('./', '.nextflow.log*')
files.each { new File(it).delete()}
}
exit 0
}
def basic_help() {
genome_size = params.genome_size ? params.genome_size : "Mash Estimate"
return """
Required Parameters:
### For Procesessing Multiple Samples
--fastqs STR An input file containing the sample name and
absolute paths to FASTQ/FASTAs to process
### For Processing A Single Sample
--R1 STR First set of reads for paired end in compressed (gzip)
FASTQ format
--R2 STR Second set of reads for paired end in compressed (gzip)
FASTQ format
--SE STR Single end set of reads in compressed (gzip) FASTQ format
--hybrid The SE should be treated as long reads for hybrid assembly.
--sample STR The name of the input sequences
### For Downloading from SRA/ENA or NCBI Assembly
**Note: Assemblies will have error free Illumina reads simulated for processing.**
--accessions An input file containing ENA/SRA Experiment accessions or
NCBI Assembly accessions to be processed
--accession A single ENA/SRA Experiment accession or NCBI Assembly accession
to be processed
### For Processing an Assembly
**Note: The assembly will have error free Illumina reads simulated for processing.**
--assembly STR A assembled genome in compressed FASTA format.
--reassemble The simulated reads will be used to create a new assembly.
Default: Use the original assembly, do not reassemble
Dataset Parameters:
--datasets DIR The path to available datasets that have
already been set up
--species STR Determines which species-specific dataset to
use for the input sequencing
Optional Parameters:
--coverage INT Reduce samples to a given coverage
Default: ${params.coverage}x
--genome_size INT Expected genome size (bp) for all samples, a value of '0'
will disable read error correction and read subsampling.
Special values (requires --species):
'min': uses minimum completed genome size of species
'median': uses median completed genome size of species
'mean': uses mean completed genome size of species
'max': uses max completed genome size of species
Default: Mash estimate
--outdir DIR Directory to write results to
Default: ${params.outdir}
Nextflow Queue Parameters:
At execution, Nextflow creates a queue and the number of slots in the queue is determined by the total number
of cores on the system. When a task is submitted to the queue, the total number of slots it occupies is
determined by the value set by "--cpus".
This can have a significant effect on the efficiency of the Nextflow's queue system. If "--cpus" is set to a
value that is equal to the number of cores availabe, in most cases only a single task will be able to run
because its occupying all available slots.
When in doubt, "--cpus 4" is a safe bet, it is also the default value if you don't use "--cpus".
--max_time INT The maximum number of minutes a single task should run before being halted.
Default: ${params.max_time} minutes
--max_memory INT The maximum amount of memory (Gb) allowed to a single task.
Default: ${params.max_memory} Gb
--cpus INT Number of processors made available to a single task.
Default: ${params.cpus}
-qs Nextflow queue size. This parameter is very useful to limit the total number of
processors used on desktops, laptops or shared resources.
Default: Nextflow defaults to the total number of processors on your system.
Nextflow Related Parameters:
--infodir DIR Directory to write Nextflow summary files to
Default: ${params.infodir}
--condadir DIR Directory to Nextflow should use for Conda environments
Default: Bactopia's Nextflow directory
--nfconfig STR A Nextflow compatible config file for custom profiles. This allows
you to create profiles specific to your environment (e.g. SGE,
AWS, SLURM, etc...). This config file is loaded last and will
overwrite existing variables if set.
Default: Bactopia's default configs
--nfdir Print directory Nextflow has pulled Bactopia to
--overwrite Nextflow will overwrite existing output files.
Default: ${params.overwrite}
--conatainerPath Path to Singularity containers to be used by the 'slurm'
profile.
Default: ${params.containerPath}
--sleep_time After reading datases, the amount of time (seconds) Nextflow
will wait before execution.
Default: ${params.sleep_time} seconds
--publish_mode Set Nextflow's method for publishing output files. Allowed methods are:
'copy' (default) Copies the output files into the published directory.
'copyNoFollow' Copies the output files into the published directory
without following symlinks ie. copies the links themselves.
'link' Creates a hard link in the published directory for each
process output file.
'rellink' Creates a relative symbolic link in the published directory
for each process output file.
'symlink' Creates an absolute symbolic link in the published directory
for each process output file.
Default: ${params.publish_mode}
--force Nextflow will overwrite existing output files.
Default: ${params.force}
-resume Nextflow will attempt to resume a previous run. Please notice it is
only a single '-'
--cleanup_workdir After Bactopia is successfully executed, the work firectory will be deleted.
Warning: by doing this you lose the ability to resume workflows.
Useful Parameters:
--skip_logs Logs for each process per sample will not be kept.
--available_datasets Print a list of available datasets found based
on location given by "--datasets"
--example_fastqs Print example of expected input for FASTQs file
--check_fastqs Verify "--fastqs" produces the expected inputs
--compress Compress (gzip) select outputs (e.g. annotation, variant calls)
to reduce overall storage footprint.
--keep_all_files Keeps all analysis files created. By default, intermediate
files are removed. This will not affect the ability
to resume Nextflow runs, and only occurs at the end
of the process.
--version Print workflow version information
--help Show this message and exit
--help_all Show a complete list of adjustable parameters
"""
}
// main script name
manifest {
author = 'Robert A. Petit III'
name = 'bactopia-wrapper'
homePage = 'https://github.com/bactopia/bactopia'
description = 'A wrapper for Bactopia to be used with AWS Batch.'
mainScript = 'main.nf'
version = '1.4.11'
nextflowVersion = '>=19'
}
// Container/Conda env version
container_version = '1.4.x'
// Default parameters
params {
// Bactopia
accession = null
accessions = null
bactopia = '/home/ec2-user/bactopia'
datasets = '/home/ec2-user/datasets'
conda = '/home/ec2-user/bactopia-envs'
publish_mode = 'copy'
overwrite = false
cpus = 4
outdir = './'
infodir = './'
help = null
version = null
}
process {
// Defaults
cpus = {params.cpus * task.attempt}
memory = {16.GB * task.attempt}
time = {90.m * task.attempt}
errorStrategy = 'retry'
maxRetries = 5
}
profiles {
standard {}
awsbatch {
executor {
name = 'awsbatch'
awscli = '/home/ec2-user/miniconda/bin/aws'
queueSize = 500
}
aws {
region = 'us-east-1'
client {
uploadStorageClass = 'STANDARD_IA'
}
batch {
cliPath = '/home/ec2-user/miniconda3/bin/aws'
maxParallelTransfers = 8
delayBetweenAttempts = 15
maxTransferAttempts = 5
volumes = ['/home/ec2-user/bactopia', '/home/ec2-user/datasets', '/home/ec2-user/bactopia-envs']
}
}
process {
executor = 'awsbatch'
queue = 'nf-batch-spot'
container = "bactopia/aws-bactopia:${container_version}"
}
docker {
enabled = true
runOptions = '-u $(id -u):$(id -g)'
}
}
}
// Reporting configuration
timeline {
enabled = true
file = "${params.infodir}/bactopia-info/bactopia-timeline.html"
}
report {
enabled = true
file = "${params.infodir}/bactopia-info/bactopia-report.html"
}
trace {
enabled = true
file = "${params.infodir }/bactopia-info/bactopia-trace.txt"
fields = 'task_id,hash,native_id,process,tag,name,status,exit,module,container,cpus,time,disk,memory,attempt,start,complete,duration,realtime,queue,%cpu,%mem,rss,vmem'
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment