Created
October 26, 2018 17:14
-
-
Save mpkocher/c892f309c1d298ed0de2aa6c0db2d317 to your computer and use it in GitHub Desktop.
Legacy smrtpipe.py workflow examples from the RS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Filters sequences based on the pls.h5 file, removing apparently | |
doubly loaded wells and unloaded wells. | |
Note: The 'chunking' (i.e., chunkFunc) in the task decorator must be | |
consistent for each instance of SMRTFile. In other words, inputPlsFofn must be | |
scattered using the same function in *every* P_module. Therefore, the | |
chunkFunc must be set to context.numMovies. | |
""" | |
import os | |
from SMRTpipe.cluster.Scatter import ScatterFofn | |
from SMRTpipe.cluster.Gather import (GatherCSV, GatherSyncedFofn, | |
GatherWithCat) | |
from SMRTpipe.modules.P_Module import P_Module | |
from SMRTpipe.engine.SmrtPipeTasks import task | |
from SMRTpipe.engine.DistributableTasks import LocallyDistributableTask | |
from SMRTpipe.engine.SmrtPipeFiles import (MovieFOFN, verifySyncedFofn, | |
SMRTFile, SMRTDataFile) | |
from SMRTpipe.modules.P_Fetch import inputPlsFofn | |
from SMRTpipe.core.utils import toSmrtVersion | |
_MAJOR_VERSION = "1.7" | |
_version = "$Change: 138434 $" | |
_revision = "$Revision: #1 $" | |
__version__ = toSmrtVersion(_MAJOR_VERSION, _version) | |
## Generated by P_Filter #################################################### | |
# | |
# Before defining the module we define the set of | |
# files created by the tasks in this module. | |
# | |
# MovieFOFN is a special case of SMRTFile with added functionality. | |
# If a relative path is specified, it is resolved against the job directory. | |
filteredRgnFofn = MovieFOFN("data/filtered_regions.fofn") | |
# | |
# Verification functions for SMRTFiles are run when --debug is specified | |
filteredRgnFofn.addVerifyFunction(verifySyncedFofn(inputPlsFofn)) | |
# | |
filteredRgnDir = SMRTFile("data/filtered_regions") | |
# | |
# SMRTDataFiles are automatically registered with the ReportService and become links | |
# in Martin and SMRTPortal. They require extra information because of this. | |
filterSummary = SMRTDataFile("data/filtered_summary.csv", group="General", | |
dataItem="Filtering", | |
format="csv") | |
# MK: Keeping this for potential testing purposes. | |
filterSubreadSummaryCsvOld = SMRTDataFile("data/filtered_subread_summary_OLD.csv", | |
group="General", | |
dataItem="Filtered Subreads", | |
format="csv") | |
filterSubreadSummaryCsv = SMRTDataFile("data/filtered_subread_summary.csv", | |
group="General", | |
dataItem="Filtered Subreads", | |
format= "csv") | |
# | |
filteredSubreads = SMRTDataFile("data/filtered_subreads.fasta", group="General", | |
dataItem="Filtered Subreads", | |
format="fasta") | |
# | |
filteredSubreadFastq= SMRTDataFile("data/filtered_subreads.fastq", group="General", | |
dataItem="Filtered Subreads", | |
format="fastq") | |
### CCS | |
filteredCCSSubreads = SMRTDataFile("data/ccs_reads.fasta", | |
group="General", | |
dataItem="CCS Reads", | |
format="fasta") | |
# | |
filteredCCSSubreadFastq = SMRTDataFile("data/ccs_reads.fastq", group="General", | |
dataItem="CCS Reads", | |
format="fastq") | |
def to_value(v): | |
"""Convert the accuracy string from an percent or an int to percent value""" | |
try: | |
# value was given as a percent (0, 100) | |
n = int(v) | |
if n == 1: | |
# this is a special case | |
m = float(n) | |
elif n < 0 or n > 100: | |
raise ValueError("Unable to convert {v}. Value outside of range (0, 100)".format(v=v)) | |
else: | |
m = n / 100.0 | |
except ValueError as e: | |
# value was given decimal | |
n = float(v) | |
if n >= 0.0 and n <= 1.0: | |
m = n | |
elif n >= 0.0 and n <= 100.0: | |
m = n / 100.0 | |
else: | |
raise ValueError("Only supported values [0, 1] or [0, 100]. Got {v}".format(v=n)) | |
return m | |
class P_Filter(P_Module): | |
"""Filters sequences based on the pls.h5 file, removing apparently doubly | |
loaded wells and unloaded wells.""" | |
VERSION = __version__ | |
def validateSettings(self): | |
"""Valid Settings and define DEFAULTS | |
Returns [] If no errors are present | |
""" | |
errors = P_Module.validateSettings(self) | |
self.reentrant = True | |
self.nproc = int(self._context.config['NPROC']) | |
self.useSubreads = self.setting("use_subreads", "True", namespace="global") | |
self.useCCS = False | |
self.filters = self.setting("filters", "") | |
self.trim = self.setting("trim", "True") | |
self.artifactScore = self.setting("artifact", None) | |
if self.artifactScore is not None and int(self.artifactScore) > 0: | |
errors.append("artifact score must be <= 0: %s" % self.artifactScore) | |
# Common filters exposed | |
param2Filter = { "readScore" : "MinReadScore", | |
"minLength" : "MinRL", | |
"maxLength" : "MaxRL", | |
"minSubReadLength" : "MinSRL", | |
"maxSubReadLength" : "MaxSRL", | |
"whiteList" : "ReadWhitelist", | |
"minSNR" : "MinSNR" } | |
filterStrings = self.filters.split(",") | |
for param, filter_ in param2Filter.items(): | |
value = self.setting(param, None) | |
if value is not None: | |
# Special handling for readScore which can be given as | |
# a percent (0, 100) or decimal value (0, 1) | |
if param is 'readScore': | |
value = "{s:.4f}".format(s=to_value(value)) | |
filterStrings.append("%s=%s" % (filter_, value)) | |
self.filters = ",".join([fs for fs in filterStrings if fs.strip() != "" ]) | |
exeOpts = { | |
"--trim" : self.trim, | |
"--filter" : self.filters | |
} | |
self.filterOpts = " ".join( [ "%s='%s'" % (k,v) for k,v in exeOpts.items() ] ) | |
return errors | |
# | |
# inputPlsFofn is imported from S_Fetch. It could also simply | |
# be re-declared somewhere in this module using the same path. | |
# | |
# Setting the task's type to be LocallyDistributable means it | |
# will be distributed into NPROC chunks in a local run. In a | |
# distributed run it will use the standard per-movie chunking | |
# unless the defaults are overridden (see P_Consensus) | |
# | |
# For Distributable or LocallyDistributable tasks you must | |
# also specify a set of Scatter and Gather functions. Each | |
# input shared with a scatter function and output shared with | |
# a gather function will be broken up in a distributed run. | |
# | |
# Scatter and Gather tasks can be specified similar to normal | |
# tasks with taskType=GatherTask or ScatterTask. Alternatively you | |
# can use the pre-made scatter/gather tasks already defined for | |
# commonly used file types (see SMRTpipe.cluster.Gather/Scatter) | |
# | |
# The errors parameter will map errors generated by this task | |
# to more informative error messages which are propagated up to | |
# the smrtpipe.py master job when the task error is triggered. | |
# | |
@task(inputs={'plsFofn': inputPlsFofn}, | |
outputs={'rgnFofn': filteredRgnFofn, | |
'summary': filterSummary, | |
'rgnDir': filteredRgnDir}, | |
taskType=LocallyDistributableTask, | |
scatters=[ScatterFofn(inputPlsFofn)], | |
gathers=[GatherSyncedFofn( filteredRgnFofn, inputPlsFofn), | |
GatherCSV(filterSummary)], | |
chunkFunc=lambda context: context.numMovies, | |
nproc=2, | |
errors={"Unable to locate BaseCalls within hdf5 file": | |
"No Base Calls found in Primary; Pulse2Base may have failed."}, | |
coreTask=True) | |
def filter(self, files): | |
"""Performs the actual filtering step.""" | |
if not os.path.exists(files.rgnDir.path): | |
os.mkdir(files.rgnDir.path) | |
standardCmd = "filter_plsh5.py --debug %s --outputDir=%s --outputSummary=%s --outputFofn=%s %s" | |
artifactCmd = "filter_artifacts.py --debug --filter='ForwardReverseScore=%s' " +\ | |
"--outputDir=%s --outputSummary=%s --outputFofn=%s %s %s" | |
if self.artifactScore is not None: | |
tmpRgnFofn = self._context.createTemporaryFile(suffix='.fofn') | |
tmpRgnDir = self._context.createTemporaryDir() | |
if not os.path.exists(tmpRgnDir): | |
os.mkdir(tmpRgnDir) | |
return [standardCmd % ( self.filterOpts, tmpRgnDir, files.summary.path, tmpRgnFofn, files.plsFofn.path), | |
artifactCmd % (self.artifactScore, files.rgnDir.path, files.summary.path, files.rgnFofn.path, files.plsFofn.path, tmpRgnFofn)] | |
else: | |
return standardCmd % (self.filterOpts, files.rgnDir.path, files.summary.path, files.rgnFofn.path, files.plsFofn.path) | |
@task(inputs={'plsFofn': inputPlsFofn, 'rgnFofn':filteredRgnFofn}, | |
outputs={'subreads':filteredSubreads, | |
'subreadFastq':filteredSubreadFastq}, | |
taskType=LocallyDistributableTask, | |
scatters=[ScatterFofn(inputPlsFofn), ScatterFofn(filteredRgnFofn)], | |
gathers=[GatherWithCat(filteredSubreads), | |
GatherWithCat(filteredSubreadFastq)], | |
chunkFunc=lambda context: context.numMovies, | |
errors={"Assertion `nDims == 1' failed": "AG4C Primary Analysis Reports are no longer supported."}) | |
def subreads(self, files): | |
"""Generates the filtered (passed filtering) subreads file.""" | |
p2fOpts = " ".join([ "-trimByRegion", | |
"-regionTable %s" % files.rgnFofn.path, | |
"" if self.useSubreads else "-noSplitSubreads"]) | |
cmds = [] | |
cmds.append("pls2fasta %s %s %s" % (files.plsFofn.path, | |
files.subreads.path, p2fOpts)) | |
cmds.append( "pls2fasta %s %s %s -fastq" % (files.plsFofn.path, | |
files.subreadFastq.path, | |
p2fOpts)) | |
return cmds | |
@task(inputs={'plsFofn': inputPlsFofn, 'rgnFofn':filteredRgnFofn}, | |
outputs={'CCSsubreads':filteredCCSSubreads,'CCSsubreadFastq':filteredCCSSubreadFastq}, | |
taskType=LocallyDistributableTask, | |
scatters=[ScatterFofn(inputPlsFofn), ScatterFofn(filteredRgnFofn)], | |
gathers=[GatherWithCat(filteredCCSSubreads), GatherWithCat(filteredCCSSubreadFastq)], | |
chunkFunc=lambda context: context.numMovies, | |
errors={"Assertion `nDims == 1' failed": "AG4C Primary Analysis Reports are no longer supported."}, | |
# Disabled now that CCS is in Secondary. See bug #23648 | |
enabled=False, | |
toggleable=True | |
) | |
def ccsSubreads(self, files): | |
p2fOpts = " ".join([ "-trimByRegion", | |
"-regionTable %s" % files.rgnFofn.path, | |
"" if self.useSubreads else "-noSplitSubreads"]) | |
cmds = [] | |
cmds.append("pls2fasta %s %s %s -ccs" % (files.plsFofn.path, | |
files.CCSsubreads.path, p2fOpts)) | |
cmds.append( "pls2fasta %s %s %s -ccs -fastq" % (files.plsFofn.path, | |
files.CCSsubreadFastq.path, | |
p2fOpts)) | |
return cmds | |
@task(inputs={'rgnFofn': filteredRgnFofn}, | |
outputs={'subreadSummary': filterSubreadSummaryCsv}) | |
def subreadSummary(self, files): | |
"""Generated Subread summary""" | |
exe = "filter_subread_summary.py" | |
rgnFofn = files.rgnFofn.path | |
outputCSV = files.subreadSummary.path | |
cmd = "{e} {r} --output={o} --debug".format(e=exe, r=rgnFofn, | |
o=outputCSV) | |
return cmd |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# $Id: //depot/software/smrtanalysis/bioinformatics/tools/SMRTpipe/SMRTpipe/modules/P_Mapping.py#3 $ | |
# | |
""" | |
v1.0 version of a module which aligns a set of reads to a reference | |
sequence. | |
""" | |
import logging | |
from SMRTpipe.engine.SmrtPipeWorkflow import SmrtWorkflowError | |
from SMRTpipe.engine.SmrtPipeFiles import SMRTDataFile, SMRTFile | |
from SMRTpipe.engine.SmrtPipeTasks import task | |
from SMRTpipe.engine.DistributableTasks import DistributableTask | |
from SMRTpipe.modules.P_Aligner import P_Aligner | |
from SMRTpipe.modules.P_Fetch import inputPlsFofn | |
from SMRTpipe.modules.P_Filter import filteredRgnFofn, filteredSubreads | |
from SMRTpipe.modules.P_Control import noControlRgnFofn, controlCmpH5 | |
from SMRTpipe.modules.P_ReferenceUploader import hgapRefRepo | |
from SMRTpipe.engine.common import USE_GLOBAL_NPROC | |
from SMRTpipe.cluster.Scatter import ScatterFofn | |
from SMRTpipe.cluster.Gather import GatherNonEmptyCmpH5 | |
from SMRTpipe.core.utils import toSmrtVersion | |
log = logging.getLogger(__name__) | |
_MAJOR_VERSION = "1.33" | |
_rev = "$Revision: #3 $" | |
_version = "$Change: 147572 $" | |
__version__ = toSmrtVersion(_MAJOR_VERSION, _version) | |
## Generated by P_Mapping ############################################ | |
# | |
# Before defining the module we define the set of | |
# files created by the tasks in this module. | |
# | |
mappingCmpH5Presort = SMRTDataFile("data/aligned_reads.cmp.h5", | |
group = "Resequencing", | |
dataItem = "Aligned Reads", | |
format = "cmp.h5", | |
state = "presorted") | |
mappingCmpH5Postsort = SMRTDataFile("data/aligned_reads.cmp.h5", | |
group = "Resequencing", | |
dataItem = "Aligned Reads", | |
format = "cmp.h5", | |
state = "postsorted") | |
mappingCmpH5Postbarcode = SMRTDataFile("data/aligned_reads.cmp.h5", | |
group = "Resequencing", | |
dataItem = "Aligned Reads", | |
format = "cmp.h5", | |
state = "postbarcode") | |
mappingCmpH5 = SMRTDataFile("data/aligned_reads.cmp.h5", | |
group = "Resequencing", | |
dataItem = "Aligned Reads", | |
format = "cmp.h5", | |
state = "sorted") | |
coverageGFF = SMRTDataFile("data/alignment_summary.gff", | |
group = "Resequencing", | |
dataItem = "Coverage", | |
format = "gff" ) | |
coverageBED = SMRTDataFile("data/coverage.bed", | |
group = "Resequencing", | |
dataItem = "Coverage", | |
format = "bed" ) | |
unmappedSubreads = SMRTDataFile("data/unmappedSubreads.fasta", | |
group = "Resequencing", | |
dataItem = "Unmapped Subreads", | |
format = "fasta") | |
from SMRTpipe.modules.P_CCS import ccsH5Fofn, ccsFasta | |
class P_Mapping(P_Aligner): | |
""" | |
Will align a set of reads to a target sequence and output them | |
as contigs. | |
""" | |
VERSION = __version__ | |
def validateSettings( self ): | |
"""Extract relevant settings from the context and store as attributes | |
of this module, setting defaults and validating where necessary.""" | |
errors = P_Aligner.validateSettings(self) | |
self.reentrant = True | |
self.numCoverageRegions = int(self.setting('num_stats_regions', '500', namespace="global")) | |
self.useDeepSorting = bool(self.setting('useDeepSorting', 'True')) | |
self.samReadGroups = self.setting('samReadGroups', 'movie') | |
# Validate the Reference | |
try: | |
self.loadRef() | |
except SmrtWorkflowError, e: | |
errors.append(e.msg) | |
# Enable or Disable CCS tasks | |
if self.workflowHasModule('P_CCS'): | |
self.disableTaskByName('align') | |
self.enableTaskByName('alignCCS') | |
else: | |
self.disableTaskByName('alignCCS') | |
self.enableTaskByName('align') | |
return errors | |
@task(inputs={'plsFofn': inputPlsFofn, | |
'rgnFofn': (noControlRgnFofn, filteredRgnFofn, inputPlsFofn), | |
'hgapRef': (hgapRefRepo, None)}, | |
outputs={'cmpH5': mappingCmpH5Presort}, | |
nproc=USE_GLOBAL_NPROC, | |
taskType=DistributableTask, | |
scatters=[ScatterFofn(inputPlsFofn), | |
ScatterFofn((noControlRgnFofn, filteredRgnFofn, inputPlsFofn))], | |
gathers=[GatherNonEmptyCmpH5(mappingCmpH5Presort, nproc=2)], | |
errors={"AssertionError: No alignments in the CmpH5 file(s).": | |
"No mapped reads: no further analysis is possible."}, | |
coreTask=True, | |
enabled=True) | |
def align(self, files): | |
return (self._align(files), "echo 'Alignment Complete'", | |
self._loadPulses(files), "echo 'LoadPulses Complete'") | |
@task(inputs={'plsFofn': ccsH5Fofn}, | |
outputs={'cmpH5': mappingCmpH5Presort}, | |
nproc=USE_GLOBAL_NPROC, | |
taskType=DistributableTask, | |
scatters=[ScatterFofn(ccsH5Fofn)], | |
gathers=[GatherNonEmptyCmpH5(mappingCmpH5Presort, nproc=2)], | |
errors={"AssertionError: No alignments in the CmpH5 file(s).": | |
"No mapped reads: no further analysis is possible."}, | |
coreTask=True, | |
enabled=False) | |
def alignCCS(self, files): | |
"""Special task for running alignment on CCS files.""" | |
return (self._align(files), "echo 'Alignment Complete'", | |
self._loadPulses(files), "echo 'LoadPulses Complete'") | |
@task(inputs={'cmpH5Presort': mappingCmpH5Presort}, | |
outputs={'cmpH5': mappingCmpH5Postsort}, | |
nproc=USE_GLOBAL_NPROC, | |
coreTask=True) | |
def sort(self, files): | |
deepFlag = "--deep" if self.useDeepSorting else "" | |
return "cmph5tools.py -vv sort %s --inPlace %s" % (deepFlag, files.cmpH5.path) | |
@task(inputs={'cmpH5Postsort': mappingCmpH5Postsort}, | |
outputs={'cmpH5': mappingCmpH5}, | |
nproc=USE_GLOBAL_NPROC, | |
coreTask=True) | |
def repack(self, files): | |
cmd = "((which h5repack && (h5repack -f GZIP=1 %s %s_TMP && mv %s_TMP %s)) || echo 'no h5repack found, continuing w/out')" % (files.cmpH5.path, files.cmpH5.path, files.cmpH5.path, files.cmpH5.path) | |
return cmd | |
@task(inputs={'cmpH5': mappingCmpH5}, | |
outputs={'covGFF': coverageGFF}, | |
toggleable=True, | |
enabled=True) | |
def covGFF(self, files): | |
return 'summarize_coverage.py --numRegions=%d %s %s' % \ | |
( self.numCoverageRegions, files.cmpH5.path, files.covGFF.path ) | |
@task(inputs={'covGFF': coverageGFF}, | |
outputs={'covBED': coverageBED}, | |
toggleable=True, | |
enabled=True) | |
def gff2Bed(self, files): | |
name = 'meanCoverage' | |
purpose = 'coverage' | |
description = 'Mean coverage of genome in fixed interval regions' | |
return 'gffToBed.py --name=%s --description="%s" %s %s > %s' % \ | |
(name, description, purpose, files.covGFF.path, files.covBED.path) | |
@task(inputs={'subreads': (ccsFasta, filteredSubreads), | |
'cmpH5': mappingCmpH5, | |
'ctrlCmpH5': (controlCmpH5, None)}, | |
outputs={'unmapped': unmappedSubreads}, | |
toggleable=True) | |
def unmapped(self, files): | |
"""When specifying inputs, a tuple of files will be resolved when the initial | |
graph is created. The tuple is resolved to the left-most file in the tuple | |
that is generated by another task in the graph. If resolving the file reaches | |
'None' in the tuple, then the input is dropped. So in order to specify an | |
optional input X that doesn't necessarily have to exist, specify (X,None). | |
In the task below, the controlCmpH5 will be considered an input to this task | |
only if the P_Control module has been run and is providing this file as an output. | |
""" | |
ctrlCmpH5 = files.ctrlCmpH5.path if 'ctrlCmpH5' in files else "" | |
return "extractUnmappedSubreads.py %s %s %s > %s" % (files.subreads.path, files.cmpH5.path, ctrlCmpH5, files.unmapped.path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment