Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
// Workflow written in SciPipe.
// For more information about SciPipe, see: http://scipipe.org
package main
import sp "github.com/scipipe/scipipe"
func main() {
// Create a workflow, using 4 cpu cores
wf := sp.NewWorkflow("my_workflow", 4)
// Initialize processes
foo := wf.NewProc("fooer", "echo foo > {o:foo}")
foo.SetOut("foo", "foo.txt")
filter := NewFilterNonexistent(wf, "filter_nonexistent")
filter.In().From(foo.Out("foo"))
cp := wf.NewProc("copy", "cat {i:in} > {o:out}")
cp.In("in").From(filter.Out())
cp.SetOut("out", "{i:in}.copy")
// Run the workflow
wf.Run()
}
// NewFilterNonexistent initializes and returns a new
// FilterNonexistent process
func NewFilterNonexistent(wf *sp.Workflow, name string) *FilterNonexistent {
p := &FilterNonexistent{
BaseProcess: sp.NewBaseProcess(wf, name),
}
p.InitInPort(p, "in")
p.InitOutPort(p, "out")
wf.AddProc(p)
return p
}
// FilterNonexistent checks each file retreived on the in-port and sends it
// unchanged on the in-port if that particular path does exist
type FilterNonexistent struct {
sp.BaseProcess
}
// In returns the (only) in-port
func (p *FilterNonexistent) In() *sp.InPort { return p.InPort("in") }
// Out returns the (only) out-port
func (p *FilterNonexistent) Out() *sp.OutPort { return p.OutPort("out") }
// Run runs the process
func (p *FilterNonexistent) Run() {
defer p.CloseAllOutPorts()
for inIP := range p.In().Chan {
// Forward all files which do exist
if inIP.Exists() {
p.Out().Send(sp.NewFileIP(inIP.Path()))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.