Skip to content

Instantly share code, notes, and snippets.

@danield137
Created March 7, 2017 10:14
Show Gist options
  • Save danield137/9114d18a89bc092fcbc105c169212bb1 to your computer and use it in GitHub Desktop.
Save danield137/9114d18a89bc092fcbc105c169212bb1 to your computer and use it in GitHub Desktop.
amature pipes in go
package main
import (
"fmt"
"time"
)
func main() {
raw := NewRawPipe()
agg := NewAggregatedPipe(raw.out)
out := NewOutPipe(agg.out)
flow := [3]Executor{out, agg, raw}
for _, item := range flow {
go item.Execute()
}
// allow code to run
time.Sleep(1 * time.Second)
}
type RawPipe struct {
out chan string
}
type AggregatedPipe struct {
in chan string
out chan string
}
type OutPipe struct {
in chan string
}
func NewRawPipe() *RawPipe {
return &RawPipe{
out: make(chan string),
}
}
func NewAggregatedPipe(in chan string) *AggregatedPipe {
return &AggregatedPipe{
in: in,
out: make(chan string),
}
}
func NewOutPipe(in chan string) *OutPipe {
return &OutPipe{
in: in,
}
}
type Executor interface {
Execute()
}
func fillStream(c chan string) {
c <- "*"
}
func (rp *RawPipe) Execute() {
fmt.Println("func A")
rp.out <- "a"
rp.out <- "b"
go fillStream(rp.out)
rp.out <- "d"
go fillStream(rp.out)
rp.out <- "e"
}
func (ap *AggregatedPipe) Execute() {
str := ""
fmt.Println("func B")
for {
select
{
case s := <-ap.in:
{
str += s
if len(str)%3 == 0 {
ap.out <- str
str = ""
}
}
}
}
}
func (op *OutPipe) Execute() {
fmt.Println("func C")
for {
select
{
case s := <-op.in:
fmt.Println(s)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment