Skip to content

Instantly share code, notes, and snippets.

@mikisvaz
Created February 12, 2019 11:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikisvaz/a65cb99bbf0d7a61f0d41f4e0ca02540 to your computer and use it in GitHub Desktop.
Save mikisvaz/a65cb99bbf0d7a61f0d41f4e0ca02540 to your computer and use it in GitHub Desktop.
Gue code to tie NGS data in project to HTS workflow
extension :bam
dep HTS, :BAM_rescore,
:fastq1 => :placeholder, :fastq2 => :placeholder, :reference => :placeholder,
:sample_name => :placeholder,
:platform_unit => :placeholder,
:read_group_name => :placeholder,
:sequencing_center => "CNAG",
:platform => 'Illuimna',
:library_name => 'LN',
:interval_list => Bellmunt.interval_list do |jobname,options|
sample_name = jobname
options[:sample_name] = sample_name.gsub('_', '.')
options[:reference] = Bellmunt.reference
sample_fastqs = []
Bellmunt.path.glob("*.fastq.gz").each do |file|
basename = File.basename(file)
sample_fastqs << file if basename.split("_")[2] == sample_name
end
case sample_fastqs.length
when 2
machine, lane, sample = File.basename(sample_fastqs.first).split("_")
options[:read_group_name] = [machine, lane] * "."
options[:platform_unit] = [machine, lane, sample] * "."
options[:fastq1] = sample_fastqs.sort.first
options[:fastq2] = sample_fastqs.sort.last
{:inputs => options, :jobname => jobname}
when 4
sample_runs = {}
sample_fastqs.each do |file|
machine, lane, sample = File.basename(file).split("_")
sample_runs[[machine,lane]*"."] ||= []
sample_runs[[machine,lane]*"."] << file
end
jobs = []
num = 1
sample_runs.each do |run_code,files|
run_options = {}
run_options[:read_group_name] = run_code + "." + sample_name
run_options[:platform_unit] = run_code
run_options[:fastq1] = files.sort.first
run_options[:fastq2] = files.sort.last
jobs << {:task => :BAM, :inputs => options.merge(run_options), :jobname => jobname + "_multiplex_" + num.to_s}
num += 1
end
jobs
else
raise "Number of fastq is not 2 or 4: #{Misc.fingerprint sample_fastqs}"
end
end
dep HTS, :BAM_multiplex, :compute => :ignore, :reference => Bellmunt.reference, :bam_files => :placeholder do |jobname,options,dependencies|
if dependencies.flatten.length > 1
{:jobname => jobname, :inputs => options.merge(:bam_files => dependencies.flatten.collect{|dep| dep.path})}
else
[]
end
end
dep_task :BAM, HTS, :BAM_rescore do |jobname,options, dependencies|
if (mutiplex = dependencies.flatten.select{|dep| dep.task_name == :BAM_multiplex}.first)
{:inputs => options.merge("HTS#BAM_duplicates" => mutiplex), :jobname => jobname + '_multiplexed'}
else
[]
end
end
@mikisvaz
Copy link
Author

mikisvaz commented Feb 12, 2019

  dep HTS, :BAM_rescore, 
    :fastq1 => :placeholder, :fastq2 => :placeholder, :reference => :placeholder,
    :sample_name => :placeholder,
    :platform_unit => :placeholder,
    :read_group_name => :placeholder,
    :sequencing_center => "CNAG",
    :platform => 'Illuimna',
    :library_name => 'LN',
    :interval_list => Bellmunt.interval_list do |jobname,options|

This part defines a dependency block, declares it of type :BAM_rescore, and sets a bunch of parameters. This is done here to make the workflow system aware of which parameters are set and which can be asked for and listed in the help. The use of the symbol :placeholder is arbitrary and serves only to make sure we do not ask for those parameters, as we will soon explain.

This declaration could be enough to define a dependency, of type :BAM_rescore, but it would not known how to set the fastq1 and fastq2 parameters and would have to ask for them, for convenience we can fix this, first by setting the :placeholder we just discussed, to make the system not ask for these parameters, and by giving the function a block of code. This block of codes resolves in run time into the actual dependency. In this block of code we can locate the FASTQ files and set the rest of parameters as we which.

There is one important complication in this setup which is that this would all be OK if there where only one set of FASTQ files for each sample, instead in Bellmunt we find that some samples have not one but two pairs of FASTQ. This breaks the default way that the workflow works, and so the flow of information has to be fixed by some plumbing.

Note that this should be fixed properly on the HTS workflow, so that it allows for a more natural way to deal with this case, which is quite common and should not be an awkward exception. How ever it shows some of the flexibility of Rbbt. We will discuss this plumbing in another comment.

@mikisvaz
Copy link
Author

    sample_name = jobname


    options[:sample_name] = sample_name.gsub('_', '.')


    options[:reference] = Bellmunt.reference
    sample_fastqs = []
    Bellmunt.path.glob("*.fastq.gz").each do |file|
      basename = File.basename(file)
      sample_fastqs << file if basename.split("_")[2] == sample_name
end

This code block finds takes the sample name from the jobname given to the task. This is a nice way to organize results, they are all in folders named after the type of information the files hold, with the sample name as file name. The sample name is used to find the FASTQ files; elsewhere is defined the path where these files are located and is what changes from project to project. Also some other parameters can be changed, such as read groups.

case sample_fastqs.length 
    when 2
      machine, lane, sample = File.basename(sample_fastqs.first).split("_")
      options[:read_group_name] = [machine, lane] * "."
      options[:platform_unit] = [machine, lane, sample] * "."
      options[:fastq1] = sample_fastqs.sort.first
      options[:fastq2] = sample_fastqs.sort.last
      {:inputs => options, :jobname => jobname}
    when 4
      sample_runs = {}
      sample_fastqs.each do |file|
        machine, lane, sample = File.basename(file).split("_")
        sample_runs[[machine,lane]*"."] ||= []
        sample_runs[[machine,lane]*"."] << file
      end
      jobs = []
      num = 1
      sample_runs.each do |run_code,files|
        run_options = {}
        run_options[:read_group_name] = run_code + "." + sample_name
        run_options[:platform_unit] = run_code
        run_options[:fastq1] = files.sort.first
        run_options[:fastq2] = files.sort.last
        jobs << {:task => :BAM, :inputs => options.merge(run_options), :jobname => jobname + "_multiplex_" + num.to_s}
        num += 1
      end
      jobs
    else
      raise "Number of fastq is not 2 or 4: #{Misc.fingerprint sample_fastqs}"
end

This part of the code does one part of the plumbing we discussed in the previous comment. If only two fastq files are found they they are just set as the corresponding parameters of the job, along with other parameters, the actual dependency is return as a Hash which defines the inputs of the new task and its jobname. The actual task to be run is assumed to be what was defined in the dep function BAM_rescore. The exceptional part is when 4 fastqs are found. In this case they are separated into two sets of functions that incorporated into not one but two dependency hashes. In fact the code block can return not one but an array of dependencies. Furthermore these dependencies do not even have to be of the same type that was declared, in this case they are of type :BAM, this is because we will inject this dependencies into the dependency graph.

  dep HTS, :BAM_multiplex, :compute => :ignore, :reference => Bellmunt.reference, :bam_files => :placeholder do |jobname,options,dependencies|
    if dependencies.flatten.length > 1
      {:jobname => jobname, :inputs => options.merge(:bam_files => dependencies.flatten.collect{|dep| dep.path})}
    else
      []
    end
end

This part of the code injects a dependency of type multiplex, it is an additional step that is only used when there are two :BAM dependencies instead of just one :BAM_rescore. If there is only one :BAM_rescore then the block of code returns no dependencies and nothing is done. If there are two :BAM files then a new dependency that takes these files and returns a merged version of these files with duplicates marked.

 dep_task :BAM, HTS, :BAM_rescore do |jobname,options, dependencies|
    if (mutiplex = dependencies.flatten.select{|dep| dep.task_name == :BAM_multiplex}.first)
      {:inputs => options.merge("HTS#BAM_duplicates" =>  mutiplex), :jobname => jobname + '_multiplexed'}
    else
      []
    end
end

This is the last part of the plumbing. Here, if we find that we have a :BAM_multiplex dependency and are thus in the exceptional case, we return a :BAM_rescore with an altered dependency tree. This alteration consists of assigning the :BAM_duplications, which is normally the step where duplicates are marked, with the :BAM_multiplex dependency we just setup. This :BAM_rescore we have setup is return as an additional dependency. Remember that in the exceptional case we didn't actually return a :BAM_rescore dependency, but two :BAM dependencies. Botton line is that:

  1. either there are only two FASTQ files and then a :BAM_rescore dependency is setup and return and nothing else happens or
  2. there are two sets of two FASTQ files, in which case two :BAM , a :BAM_multiplex, and a :BAM_rescore dependencies are return with some custom plumbing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment