Skip to content

Instantly share code, notes, and snippets.

@ihodes
Last active February 10, 2017 21:23
Show Gist options
  • Save ihodes/3b928d494789b3b345af28400f7978ec to your computer and use it in GitHub Desktop.
Save ihodes/3b928d494789b3b345af28400f7978ec to your computer and use it in GitHub Desktop.
#use "topfind";;
#thread
#require "coclobas.ketrew_backend,biokepi,cmdliner,nonstd,sosa,csv";;
(* You must have a biokepi_machine.ml file in this directory that has a
`biokepi_machine` defined at the top level. *)
#use "biokepi_machine.ml"
open Nonstd
module String = Sosa.Native_string
(** We want to extend the compiler to handle a new function,
`write_csv_manifest`, so we define the new signature that the compiler must
have. *)
module type Semantics = sig
include Biokepi.EDSL.Semantics
val write_csv_manifest :
normal:[ `Bam ] repr ->
tumor:[ `Bam ] repr ->
vcfs:(string * [ `Vcf ] repr) list ->
string -> unit
end
(** Here we add the function itself for the `To_workflow` compiler (the compiler
which handles the transformation from the eDSL to actual Ketrew workflow
nodes). All we're adding is a function which outputs a CSV locally when the
workflow is compiled. *)
module To_workflow
(Config : Biokepi.EDSL.Compile.To_workflow.Compiler_configuration) =
struct
include Biokepi.EDSL.Compile.To_workflow.Make(Config)
let write_csv_manifest ~normal ~tumor ~vcfs out
=
let csv =
let module F = Biokepi.EDSL.Compile.To_workflow.File_type_specification in
let header = ["name"; "path"] in
let vcfs =
List.map ~f:(fun (name, n) -> [name; (F.get_vcf n)#product#path]) vcfs
in
[
header;
["normal"; (F.get_bam normal)#product#path];
["tumor"; (F.get_bam tumor)#product#path];
] @ vcfs
in
let outc = Csv.to_channel (open_out out) in
Csv.output_all outc csv
end
(** The actual pipeline that does our work. *)
module Pipeline(Bfx: Semantics) = struct
module Stdlib = Biokepi.EDSL.Library.Make(Bfx)
(** This takes our list of FASTQs (pairs, or single-ended) and aligns them,
then merges them all together. *)
let to_bam ~reference_build fastqs =
Stdlib.bwa_mem_opt_inputs fastqs
|> List.map ~f:(Bfx.bwa_mem_opt ~reference_build ?configuration:None)
|> Bfx.list
|> Bfx.merge_bams
|> Bfx.picard_mark_duplicates
let pipeline ~normal_fastqs ~tumor_fastqs ~reference_build ~results_manifest =
let normal = to_bam ~reference_build normal_fastqs in
let tumor = to_bam ~reference_build tumor_fastqs in
let strelka_vcf = Bfx.strelka ~normal ~tumor () in
let mutect_vcf = Bfx.mutect ~normal ~tumor () in
let normal_haplotype_vcf = Bfx.gatk_haplotype_caller normal in
let tumor_haplotype_vcf = Bfx.gatk_haplotype_caller tumor in
let () =
let vcfs = ["strelka", strelka_vcf; "mutect", mutect_vcf;
"normal_haplotype", normal_haplotype_vcf;
"tumor_haplotype", tumor_haplotype_vcf] in
Bfx.write_csv_manifest ~normal ~tumor ~vcfs results_manifest
in
Bfx.list [Bfx.to_unit normal;
Bfx.to_unit tumor;
Bfx.to_unit strelka_vcf;
Bfx.to_unit mutect_vcf;
Bfx.to_unit tumor_haplotype_vcf;
Bfx.to_unit normal_haplotype_vcf]
let parse_input_fastqs ~sample_name files =
let open Biokepi.EDSL.Library.Input in
let parse_file file =
match (String.split ~on:(`Character ';') file) with
| [ pair1; pair2; ] -> pe pair1 pair2
| [ single_end; ] -> se single_end
| _ -> failwith "Couldn't parse FASTQ."
in
fastq_sample ~sample_name (List.map ~f:(fun f -> parse_file f) files)
let run ~normal_paths ~tumor_paths ~name ~reference_build ~results_manifest =
let normal_fastqs =
parse_input_fastqs normal_paths ~sample_name:(sprintf "normal-%s" name) in
let tumor_fastqs =
parse_input_fastqs tumor_paths ~sample_name:(sprintf "tumor-%s" name) in
Bfx.observe (fun () ->
pipeline
~normal_fastqs ~tumor_fastqs ~reference_build ~results_manifest
|> Bfx.to_unit)
end
(** These describe the CLI interface, and is not specific to Biokepi at all
(uses the Cmdliner library). *)
let args f =
let open Cmdliner in
let open Cmdliner.Term in
app f begin
(* We use these "wrapper" functions adding polymorphic types in order to
further type the various returned bools and strings from the CLI. *)
pure (fun s -> `Name s)
$ Arg.(
required & opt (some string) None
& info ["name"]
~doc:"Name of the run (unique per sample/configuration).")
end
$ begin
pure (fun s -> `Results_dir s)
$ Arg.(
required & opt (some string) None
& info ["results-directory"]
~doc:"Directory where all files related to this run will be stored.")
end
$ begin
pure (fun s -> `Results_manifest s)
$ Arg.(
required & opt (some string) None
& info ["local-results-manifest-path"; "R"]
~doc:"Local path where CSV with all results/files listed will be created.")
end
$ begin
pure (fun s -> `Dry_run s)
$ Arg.(
value & flag & info ["dry-run"]
~doc:"Don't submit this job to Ketrew (test that it compiles).")
end
$ begin
pure (fun s -> `Normal_fastqs s)
$ Arg.(
required & opt (some (list string)) None
& info ["normal-fastqs"]
~doc:"Path to normal FASTQs, comma-separated pairs separated by\
semicolons, e.g. (a.r1.fq.gz;a.r2.fq.gz,b.r1.fq.gz;b.r2.fq.gz)")
end
$ begin
pure (fun s -> `Tumor_fastqs s)
$ Arg.(
required & opt (some (list string)) None
& info ["tumor-fastqs"]
~doc:"Path to tumor FASTQs, comma-separated pairs separated by\
semicolons, e.g. (a.r1.fq.gz;a.r2.fq.gz,b.r1.fq.gz;b.r2.fq.gz)")
end
$ begin
let references =
["b37decoy", "b37decoy"; "b37", "b37"; "b38", "b38"; "mm10", "mm10"] in
pure (fun s -> `Reference_build s)
$ Arg.(
required & opt (some (enum references)) None
& info ["reference-build"; "R"]
~doc:"Reference build to use.")
end
(** Here we compile & submit the pipeline. *)
let run () =
fun
(`Name name)
(`Results_dir results_dir)
(`Results_manifest results_manifest)
(`Dry_run dry_run)
(`Normal_fastqs normal_paths)
(`Tumor_fastqs tumor_paths)
(`Reference_build reference_build)
->
(** This Config module is required to pass configuration & the
Biokepi.Machine.t (describing the compute infrastructure) to the
To_workflow compiler. This is used to compile the above pipeline to
a bunch of Ketrew jobs. *)
let module Config = struct
include Biokepi.EDSL.Compile.To_workflow.Defaults
(* This is where all the files generated by the workflow will end up. *)
let work_dir = results_dir
let machine = biokepi_machine
end in
let module Workflow_compiler =
To_workflow(Config) in
let module Compiled_pipeline = Pipeline(Workflow_compiler) in
let workflow =
Compiled_pipeline.run
~normal_paths ~tumor_paths ~name ~reference_build ~results_manifest
|> Biokepi.EDSL.Compile.To_workflow.File_type_specification.
get_unit_workflow ~name
in
if dry_run then
printf "Dry run, not submitting!\n"
else
let _ = Ketrew.Client.submit_workflow workflow ~add_tags:[name] in
printf "Submitted workflow \"%s\"!\n" name
(** Here we join the arguments in `args` with the actual `run` function taking
those arguments into the complete CLI. *)
let cli () =
let open Cmdliner in
let info = Term.(info "Tim's pipeline") in
let term = Term.(pure (run ())) |> args in
(term, info)
(** This is the "main" function that's actually called. *)
let () =
let open Cmdliner in
match Term.eval (cli ()) with
| `Error _ -> exit 1
| `Ok f -> f
| _ -> exit 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment