Skip to content

Instantly share code, notes, and snippets.

@samuell
Created August 17, 2020 13:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samuell/63ad3c3149b7519b33f22826fad353a7 to your computer and use it in GitHub Desktop.
Save samuell/63ad3c3149b7519b33f22826fad353a7 to your computer and use it in GitHub Desktop.
// Workflow written in SciPipe.
// For more information about SciPipe, see: http://scipipe.org
package main
import sp "github.com/scipipe/scipipe"
func main() {
// Create a workflow, using 4 cpu cores
wf := sp.NewWorkflow("my_workflow", 4)
// Initialize processes
foo := wf.NewProc("fooer", "echo foo > {o:foo}")
foo.SetOut("foo", "foo.txt")
filter := NewFilterNonexistent(wf, "filter_nonexistent")
filter.In().From(foo.Out("foo"))
cp := wf.NewProc("copy", "cat {i:in} > {o:out}")
cp.In("in").From(filter.Out())
cp.SetOut("out", "{i:in}.copy")
// Run the workflow
wf.Run()
}
// NewFilterNonexistent initializes and returns a new
// FilterNonexistent process
func NewFilterNonexistent(wf *sp.Workflow, name string) *FilterNonexistent {
p := &FilterNonexistent{
BaseProcess: sp.NewBaseProcess(wf, name),
}
p.InitInPort(p, "in")
p.InitOutPort(p, "out")
wf.AddProc(p)
return p
}
// FilterNonexistent checks each file retreived on the in-port and sends it
// unchanged on the in-port if that particular path does exist
type FilterNonexistent struct {
sp.BaseProcess
}
// In returns the (only) in-port
func (p *FilterNonexistent) In() *sp.InPort { return p.InPort("in") }
// Out returns the (only) out-port
func (p *FilterNonexistent) Out() *sp.OutPort { return p.OutPort("out") }
// Run runs the process
func (p *FilterNonexistent) Run() {
defer p.CloseAllOutPorts()
for inIP := range p.In().Chan {
// Forward all files which do exist
if inIP.Exists() {
p.Out().Send(sp.NewFileIP(inIP.Path()))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment