Skip to content

Instantly share code, notes, and snippets.

@mpkocher
Created October 26, 2018 17:14
Show Gist options
  • Save mpkocher/c892f309c1d298ed0de2aa6c0db2d317 to your computer and use it in GitHub Desktop.
Save mpkocher/c892f309c1d298ed0de2aa6c0db2d317 to your computer and use it in GitHub Desktop.
Legacy smrtpipe.py workflow examples from the RS
"""Filters sequences based on the pls.h5 file, removing apparently
doubly loaded wells and unloaded wells.
Note: The 'chunking' (i.e., chunkFunc) in the task decorator must be
consistent for each instance of SMRTFile. In other words, inputPlsFofn must be
scattered using the same function in *every* P_module. Therefore, the
chunkFunc must be set to context.numMovies.
"""
import os
from SMRTpipe.cluster.Scatter import ScatterFofn
from SMRTpipe.cluster.Gather import (GatherCSV, GatherSyncedFofn,
GatherWithCat)
from SMRTpipe.modules.P_Module import P_Module
from SMRTpipe.engine.SmrtPipeTasks import task
from SMRTpipe.engine.DistributableTasks import LocallyDistributableTask
from SMRTpipe.engine.SmrtPipeFiles import (MovieFOFN, verifySyncedFofn,
SMRTFile, SMRTDataFile)
from SMRTpipe.modules.P_Fetch import inputPlsFofn
from SMRTpipe.core.utils import toSmrtVersion
_MAJOR_VERSION = "1.7"
_version = "$Change: 138434 $"
_revision = "$Revision: #1 $"
__version__ = toSmrtVersion(_MAJOR_VERSION, _version)
## Generated by P_Filter ####################################################
#
# Before defining the module we define the set of
# files created by the tasks in this module.
#
# MovieFOFN is a special case of SMRTFile with added functionality.
# If a relative path is specified, it is resolved against the job directory.
filteredRgnFofn = MovieFOFN("data/filtered_regions.fofn")
#
# Verification functions for SMRTFiles are run when --debug is specified
filteredRgnFofn.addVerifyFunction(verifySyncedFofn(inputPlsFofn))
#
filteredRgnDir = SMRTFile("data/filtered_regions")
#
# SMRTDataFiles are automatically registered with the ReportService and become links
# in Martin and SMRTPortal. They require extra information because of this.
filterSummary = SMRTDataFile("data/filtered_summary.csv", group="General",
dataItem="Filtering",
format="csv")
# MK: Keeping this for potential testing purposes.
filterSubreadSummaryCsvOld = SMRTDataFile("data/filtered_subread_summary_OLD.csv",
group="General",
dataItem="Filtered Subreads",
format="csv")
filterSubreadSummaryCsv = SMRTDataFile("data/filtered_subread_summary.csv",
group="General",
dataItem="Filtered Subreads",
format= "csv")
#
filteredSubreads = SMRTDataFile("data/filtered_subreads.fasta", group="General",
dataItem="Filtered Subreads",
format="fasta")
#
filteredSubreadFastq= SMRTDataFile("data/filtered_subreads.fastq", group="General",
dataItem="Filtered Subreads",
format="fastq")
### CCS
filteredCCSSubreads = SMRTDataFile("data/ccs_reads.fasta",
group="General",
dataItem="CCS Reads",
format="fasta")
#
filteredCCSSubreadFastq = SMRTDataFile("data/ccs_reads.fastq", group="General",
dataItem="CCS Reads",
format="fastq")
def to_value(v):
"""Convert the accuracy string from an percent or an int to percent value"""
try:
# value was given as a percent (0, 100)
n = int(v)
if n == 1:
# this is a special case
m = float(n)
elif n < 0 or n > 100:
raise ValueError("Unable to convert {v}. Value outside of range (0, 100)".format(v=v))
else:
m = n / 100.0
except ValueError as e:
# value was given decimal
n = float(v)
if n >= 0.0 and n <= 1.0:
m = n
elif n >= 0.0 and n <= 100.0:
m = n / 100.0
else:
raise ValueError("Only supported values [0, 1] or [0, 100]. Got {v}".format(v=n))
return m
class P_Filter(P_Module):
"""Filters sequences based on the pls.h5 file, removing apparently doubly
loaded wells and unloaded wells."""
VERSION = __version__
def validateSettings(self):
"""Valid Settings and define DEFAULTS
Returns [] If no errors are present
"""
errors = P_Module.validateSettings(self)
self.reentrant = True
self.nproc = int(self._context.config['NPROC'])
self.useSubreads = self.setting("use_subreads", "True", namespace="global")
self.useCCS = False
self.filters = self.setting("filters", "")
self.trim = self.setting("trim", "True")
self.artifactScore = self.setting("artifact", None)
if self.artifactScore is not None and int(self.artifactScore) > 0:
errors.append("artifact score must be <= 0: %s" % self.artifactScore)
# Common filters exposed
param2Filter = { "readScore" : "MinReadScore",
"minLength" : "MinRL",
"maxLength" : "MaxRL",
"minSubReadLength" : "MinSRL",
"maxSubReadLength" : "MaxSRL",
"whiteList" : "ReadWhitelist",
"minSNR" : "MinSNR" }
filterStrings = self.filters.split(",")
for param, filter_ in param2Filter.items():
value = self.setting(param, None)
if value is not None:
# Special handling for readScore which can be given as
# a percent (0, 100) or decimal value (0, 1)
if param is 'readScore':
value = "{s:.4f}".format(s=to_value(value))
filterStrings.append("%s=%s" % (filter_, value))
self.filters = ",".join([fs for fs in filterStrings if fs.strip() != "" ])
exeOpts = {
"--trim" : self.trim,
"--filter" : self.filters
}
self.filterOpts = " ".join( [ "%s='%s'" % (k,v) for k,v in exeOpts.items() ] )
return errors
#
# inputPlsFofn is imported from S_Fetch. It could also simply
# be re-declared somewhere in this module using the same path.
#
# Setting the task's type to be LocallyDistributable means it
# will be distributed into NPROC chunks in a local run. In a
# distributed run it will use the standard per-movie chunking
# unless the defaults are overridden (see P_Consensus)
#
# For Distributable or LocallyDistributable tasks you must
# also specify a set of Scatter and Gather functions. Each
# input shared with a scatter function and output shared with
# a gather function will be broken up in a distributed run.
#
# Scatter and Gather tasks can be specified similar to normal
# tasks with taskType=GatherTask or ScatterTask. Alternatively you
# can use the pre-made scatter/gather tasks already defined for
# commonly used file types (see SMRTpipe.cluster.Gather/Scatter)
#
# The errors parameter will map errors generated by this task
# to more informative error messages which are propagated up to
# the smrtpipe.py master job when the task error is triggered.
#
@task(inputs={'plsFofn': inputPlsFofn},
outputs={'rgnFofn': filteredRgnFofn,
'summary': filterSummary,
'rgnDir': filteredRgnDir},
taskType=LocallyDistributableTask,
scatters=[ScatterFofn(inputPlsFofn)],
gathers=[GatherSyncedFofn( filteredRgnFofn, inputPlsFofn),
GatherCSV(filterSummary)],
chunkFunc=lambda context: context.numMovies,
nproc=2,
errors={"Unable to locate BaseCalls within hdf5 file":
"No Base Calls found in Primary; Pulse2Base may have failed."},
coreTask=True)
def filter(self, files):
"""Performs the actual filtering step."""
if not os.path.exists(files.rgnDir.path):
os.mkdir(files.rgnDir.path)
standardCmd = "filter_plsh5.py --debug %s --outputDir=%s --outputSummary=%s --outputFofn=%s %s"
artifactCmd = "filter_artifacts.py --debug --filter='ForwardReverseScore=%s' " +\
"--outputDir=%s --outputSummary=%s --outputFofn=%s %s %s"
if self.artifactScore is not None:
tmpRgnFofn = self._context.createTemporaryFile(suffix='.fofn')
tmpRgnDir = self._context.createTemporaryDir()
if not os.path.exists(tmpRgnDir):
os.mkdir(tmpRgnDir)
return [standardCmd % ( self.filterOpts, tmpRgnDir, files.summary.path, tmpRgnFofn, files.plsFofn.path),
artifactCmd % (self.artifactScore, files.rgnDir.path, files.summary.path, files.rgnFofn.path, files.plsFofn.path, tmpRgnFofn)]
else:
return standardCmd % (self.filterOpts, files.rgnDir.path, files.summary.path, files.rgnFofn.path, files.plsFofn.path)
@task(inputs={'plsFofn': inputPlsFofn, 'rgnFofn':filteredRgnFofn},
outputs={'subreads':filteredSubreads,
'subreadFastq':filteredSubreadFastq},
taskType=LocallyDistributableTask,
scatters=[ScatterFofn(inputPlsFofn), ScatterFofn(filteredRgnFofn)],
gathers=[GatherWithCat(filteredSubreads),
GatherWithCat(filteredSubreadFastq)],
chunkFunc=lambda context: context.numMovies,
errors={"Assertion `nDims == 1' failed": "AG4C Primary Analysis Reports are no longer supported."})
def subreads(self, files):
"""Generates the filtered (passed filtering) subreads file."""
p2fOpts = " ".join([ "-trimByRegion",
"-regionTable %s" % files.rgnFofn.path,
"" if self.useSubreads else "-noSplitSubreads"])
cmds = []
cmds.append("pls2fasta %s %s %s" % (files.plsFofn.path,
files.subreads.path, p2fOpts))
cmds.append( "pls2fasta %s %s %s -fastq" % (files.plsFofn.path,
files.subreadFastq.path,
p2fOpts))
return cmds
@task(inputs={'plsFofn': inputPlsFofn, 'rgnFofn':filteredRgnFofn},
outputs={'CCSsubreads':filteredCCSSubreads,'CCSsubreadFastq':filteredCCSSubreadFastq},
taskType=LocallyDistributableTask,
scatters=[ScatterFofn(inputPlsFofn), ScatterFofn(filteredRgnFofn)],
gathers=[GatherWithCat(filteredCCSSubreads), GatherWithCat(filteredCCSSubreadFastq)],
chunkFunc=lambda context: context.numMovies,
errors={"Assertion `nDims == 1' failed": "AG4C Primary Analysis Reports are no longer supported."},
# Disabled now that CCS is in Secondary. See bug #23648
enabled=False,
toggleable=True
)
def ccsSubreads(self, files):
p2fOpts = " ".join([ "-trimByRegion",
"-regionTable %s" % files.rgnFofn.path,
"" if self.useSubreads else "-noSplitSubreads"])
cmds = []
cmds.append("pls2fasta %s %s %s -ccs" % (files.plsFofn.path,
files.CCSsubreads.path, p2fOpts))
cmds.append( "pls2fasta %s %s %s -ccs -fastq" % (files.plsFofn.path,
files.CCSsubreadFastq.path,
p2fOpts))
return cmds
@task(inputs={'rgnFofn': filteredRgnFofn},
outputs={'subreadSummary': filterSubreadSummaryCsv})
def subreadSummary(self, files):
"""Generated Subread summary"""
exe = "filter_subread_summary.py"
rgnFofn = files.rgnFofn.path
outputCSV = files.subreadSummary.path
cmd = "{e} {r} --output={o} --debug".format(e=exe, r=rgnFofn,
o=outputCSV)
return cmd
#
# $Id: //depot/software/smrtanalysis/bioinformatics/tools/SMRTpipe/SMRTpipe/modules/P_Mapping.py#3 $
#
"""
v1.0 version of a module which aligns a set of reads to a reference
sequence.
"""
import logging
from SMRTpipe.engine.SmrtPipeWorkflow import SmrtWorkflowError
from SMRTpipe.engine.SmrtPipeFiles import SMRTDataFile, SMRTFile
from SMRTpipe.engine.SmrtPipeTasks import task
from SMRTpipe.engine.DistributableTasks import DistributableTask
from SMRTpipe.modules.P_Aligner import P_Aligner
from SMRTpipe.modules.P_Fetch import inputPlsFofn
from SMRTpipe.modules.P_Filter import filteredRgnFofn, filteredSubreads
from SMRTpipe.modules.P_Control import noControlRgnFofn, controlCmpH5
from SMRTpipe.modules.P_ReferenceUploader import hgapRefRepo
from SMRTpipe.engine.common import USE_GLOBAL_NPROC
from SMRTpipe.cluster.Scatter import ScatterFofn
from SMRTpipe.cluster.Gather import GatherNonEmptyCmpH5
from SMRTpipe.core.utils import toSmrtVersion
log = logging.getLogger(__name__)
_MAJOR_VERSION = "1.33"
_rev = "$Revision: #3 $"
_version = "$Change: 147572 $"
__version__ = toSmrtVersion(_MAJOR_VERSION, _version)
## Generated by P_Mapping ############################################
#
# Before defining the module we define the set of
# files created by the tasks in this module.
#
mappingCmpH5Presort = SMRTDataFile("data/aligned_reads.cmp.h5",
group = "Resequencing",
dataItem = "Aligned Reads",
format = "cmp.h5",
state = "presorted")
mappingCmpH5Postsort = SMRTDataFile("data/aligned_reads.cmp.h5",
group = "Resequencing",
dataItem = "Aligned Reads",
format = "cmp.h5",
state = "postsorted")
mappingCmpH5Postbarcode = SMRTDataFile("data/aligned_reads.cmp.h5",
group = "Resequencing",
dataItem = "Aligned Reads",
format = "cmp.h5",
state = "postbarcode")
mappingCmpH5 = SMRTDataFile("data/aligned_reads.cmp.h5",
group = "Resequencing",
dataItem = "Aligned Reads",
format = "cmp.h5",
state = "sorted")
coverageGFF = SMRTDataFile("data/alignment_summary.gff",
group = "Resequencing",
dataItem = "Coverage",
format = "gff" )
coverageBED = SMRTDataFile("data/coverage.bed",
group = "Resequencing",
dataItem = "Coverage",
format = "bed" )
unmappedSubreads = SMRTDataFile("data/unmappedSubreads.fasta",
group = "Resequencing",
dataItem = "Unmapped Subreads",
format = "fasta")
from SMRTpipe.modules.P_CCS import ccsH5Fofn, ccsFasta
class P_Mapping(P_Aligner):
"""
Will align a set of reads to a target sequence and output them
as contigs.
"""
VERSION = __version__
def validateSettings( self ):
"""Extract relevant settings from the context and store as attributes
of this module, setting defaults and validating where necessary."""
errors = P_Aligner.validateSettings(self)
self.reentrant = True
self.numCoverageRegions = int(self.setting('num_stats_regions', '500', namespace="global"))
self.useDeepSorting = bool(self.setting('useDeepSorting', 'True'))
self.samReadGroups = self.setting('samReadGroups', 'movie')
# Validate the Reference
try:
self.loadRef()
except SmrtWorkflowError, e:
errors.append(e.msg)
# Enable or Disable CCS tasks
if self.workflowHasModule('P_CCS'):
self.disableTaskByName('align')
self.enableTaskByName('alignCCS')
else:
self.disableTaskByName('alignCCS')
self.enableTaskByName('align')
return errors
@task(inputs={'plsFofn': inputPlsFofn,
'rgnFofn': (noControlRgnFofn, filteredRgnFofn, inputPlsFofn),
'hgapRef': (hgapRefRepo, None)},
outputs={'cmpH5': mappingCmpH5Presort},
nproc=USE_GLOBAL_NPROC,
taskType=DistributableTask,
scatters=[ScatterFofn(inputPlsFofn),
ScatterFofn((noControlRgnFofn, filteredRgnFofn, inputPlsFofn))],
gathers=[GatherNonEmptyCmpH5(mappingCmpH5Presort, nproc=2)],
errors={"AssertionError: No alignments in the CmpH5 file(s).":
"No mapped reads: no further analysis is possible."},
coreTask=True,
enabled=True)
def align(self, files):
return (self._align(files), "echo 'Alignment Complete'",
self._loadPulses(files), "echo 'LoadPulses Complete'")
@task(inputs={'plsFofn': ccsH5Fofn},
outputs={'cmpH5': mappingCmpH5Presort},
nproc=USE_GLOBAL_NPROC,
taskType=DistributableTask,
scatters=[ScatterFofn(ccsH5Fofn)],
gathers=[GatherNonEmptyCmpH5(mappingCmpH5Presort, nproc=2)],
errors={"AssertionError: No alignments in the CmpH5 file(s).":
"No mapped reads: no further analysis is possible."},
coreTask=True,
enabled=False)
def alignCCS(self, files):
"""Special task for running alignment on CCS files."""
return (self._align(files), "echo 'Alignment Complete'",
self._loadPulses(files), "echo 'LoadPulses Complete'")
@task(inputs={'cmpH5Presort': mappingCmpH5Presort},
outputs={'cmpH5': mappingCmpH5Postsort},
nproc=USE_GLOBAL_NPROC,
coreTask=True)
def sort(self, files):
deepFlag = "--deep" if self.useDeepSorting else ""
return "cmph5tools.py -vv sort %s --inPlace %s" % (deepFlag, files.cmpH5.path)
@task(inputs={'cmpH5Postsort': mappingCmpH5Postsort},
outputs={'cmpH5': mappingCmpH5},
nproc=USE_GLOBAL_NPROC,
coreTask=True)
def repack(self, files):
cmd = "((which h5repack && (h5repack -f GZIP=1 %s %s_TMP && mv %s_TMP %s)) || echo 'no h5repack found, continuing w/out')" % (files.cmpH5.path, files.cmpH5.path, files.cmpH5.path, files.cmpH5.path)
return cmd
@task(inputs={'cmpH5': mappingCmpH5},
outputs={'covGFF': coverageGFF},
toggleable=True,
enabled=True)
def covGFF(self, files):
return 'summarize_coverage.py --numRegions=%d %s %s' % \
( self.numCoverageRegions, files.cmpH5.path, files.covGFF.path )
@task(inputs={'covGFF': coverageGFF},
outputs={'covBED': coverageBED},
toggleable=True,
enabled=True)
def gff2Bed(self, files):
name = 'meanCoverage'
purpose = 'coverage'
description = 'Mean coverage of genome in fixed interval regions'
return 'gffToBed.py --name=%s --description="%s" %s %s > %s' % \
(name, description, purpose, files.covGFF.path, files.covBED.path)
@task(inputs={'subreads': (ccsFasta, filteredSubreads),
'cmpH5': mappingCmpH5,
'ctrlCmpH5': (controlCmpH5, None)},
outputs={'unmapped': unmappedSubreads},
toggleable=True)
def unmapped(self, files):
"""When specifying inputs, a tuple of files will be resolved when the initial
graph is created. The tuple is resolved to the left-most file in the tuple
that is generated by another task in the graph. If resolving the file reaches
'None' in the tuple, then the input is dropped. So in order to specify an
optional input X that doesn't necessarily have to exist, specify (X,None).
In the task below, the controlCmpH5 will be considered an input to this task
only if the P_Control module has been run and is providing this file as an output.
"""
ctrlCmpH5 = files.ctrlCmpH5.path if 'ctrlCmpH5' in files else ""
return "extractUnmappedSubreads.py %s %s %s > %s" % (files.subreads.path, files.cmpH5.path, ctrlCmpH5, files.unmapped.path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment