Skip to content

Instantly share code, notes, and snippets.

@heuermh
Created May 17, 2021 18:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save heuermh/e5c7b8c04b7cb7a689c5fae6309bc7b2 to your computer and use it in GitHub Desktop.
Save heuermh/e5c7b8c04b7cb7a689c5fae6309bc7b2 to your computer and use it in GitHub Desktop.
Spark shell in Nextflow process example
#!/usr/bin/env nextflow
/*
* The authors of this file license it to you under the
* Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
params.dir = "${baseDir}/example"
params.resultsDir = "${baseDir}/results"
// override with Spark arguments and configuration
params.sparkArgs = "--master local[*]"
fastqFiles = "${params.dir}/*_{1,2}.fq"
fastqs = Channel.fromFilePairs(fastqFiles, size: 2)
process convertFragments {
tag { sample }
container "quay.io/biocontainers/adam:0.35.0--hdfd78af_0"
input:
tuple val(sample), val(fastq) from fastqs
output:
tuple val(sample), path("${sample}.fragments.adam") into fragments
"""
adam-submit \
${params.sparkArgs} \
-- \
transformFragments \
-paired_fastq ${fastq[1]} \
${fastq[0]} \
${sample}.fragments.adam
"""
}
sampleSizes = Channel.of(100_000, 1_000_000, 10_000_000, 100_000_000)
samples = fragments.combine(sampleSizes)
process sampleFragments {
tag { "${sample}.${sampleSize}" }
publishDir "${params.resultsDir}", mode: 'copy'
container "quay.io/biocontainers/adam:0.35.0--hdfd78af_0"
input:
tuple val(sample), path(fragments), val(sampleSize) from samples
output:
tuple val(sample), path("${sample}.${sampleSize}_1.fq.gz"), path("${sample}.${sampleSize}_2.fq.gz") into pairedFastq
script:
scala =
"""
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.ds.ADAMContext._
import org.bdgenomics.formats.avro.Fragment
val fragments = sc.loadFragments("${fragments}")
val count:Float = fragments.rdd.count()
val fraction:Float = ${sampleSize} / count
fragments
.transform((rdd: RDD[Fragment]) => rdd.sample(withReplacement = false, fraction = fraction, seed = 42))
.toAlignments()
.saveAsPairedFastq(
fileName1 = "${sample}.${sampleSize}_1.fq",
fileName2 = "${sample}.${sampleSize}_2.fq",
asSingleFile = true
)
System.exit(0)
"""
"""
echo '${scala}' > script.scala
adam-shell \
${params.sparkArgs} \
-I script.scala
gzip ${sample}.${sampleSize}_1.fq
gzip ${sample}.${sampleSize}_2.fq
"""
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment