Skip to content

Instantly share code, notes, and snippets.

@nikhan
Created September 11, 2014 03:07
Show Gist options
  • Save nikhan/91da4505ee860a9be275 to your computer and use it in GitHub Desktop.
Save nikhan/91da4505ee860a9be275 to your computer and use it in GitHub Desktop.
simple data pipeline
package main
import (
"time"
"fmt"
)
type Task func(interface{}) interface{}
type Sink func(interface{})
func TaskRoutine(f Task, in <-chan interface{}, out chan<- interface{}){
for m := range in {
out <- f(m)
}
fmt.Println("closing task")
close(out)
}
func TaskSink(f Sink, in <-chan interface{}){
for m := range in {
f(m)
}
fmt.Println("closing sink")
}
func main(){
add := func(m interface{}) interface{}{
return m.(float64) + 1.0
}
div := func(m interface{}) interface{}{
return m.(float64) / 10.0
}
prn := func(m interface{}) {
fmt.Println(m)
}
addIn := make(chan interface{})
addOut := make(chan interface{})
divOut := make(chan interface{})
go TaskRoutine(add,addIn,addOut)
go TaskRoutine(div,addOut,divOut)
go TaskSink(prn,divOut)
stop := time.NewTimer(5 * time.Second)
tick := time.NewTicker(1 * time.Second)
c := 0.0
for{
select {
case <- tick.C:
c += 1.0;
addIn <- c
case <- stop.C:
close(addIn)
time.Sleep(1 * time.Second)
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment