Last active
August 29, 2015 14:04
-
-
Save mikedewar/6dce654c8503b404595b to your computer and use it in GitHub Desktop.
a stream processing library in go, where every function only accepts channels as inputs and only returns channels as outputs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// merge takes two streams and turns them into one stream | |
func merge(a, b chan interface{}) chan interface{} { | |
o := make(chan interface{}) | |
go func() { | |
for { | |
select { | |
case m := <-a: | |
o <- m | |
case m := <-b: | |
o <- m | |
} | |
} | |
}() | |
return o | |
} | |
// regulate emits a message from a only upon recieving a message from b | |
func regulate(a chan interface{}, b chan interface{}) chan interface{} { | |
o := make(chan interface{}) | |
go func() { | |
for { | |
<-b | |
select { | |
case n := <-a: | |
o <- n | |
default: | |
continue | |
} | |
} | |
}() | |
return o | |
} | |
// accumulate emits all the messages that have arrived since the last message from b | |
func accumulate(a chan interface{}, b chan interface{}) chan interface{} { | |
o := make(chan interface{}) | |
go func() { | |
queue := make([]interface{}, 0) | |
for { | |
select { | |
case m := <-a: | |
queue = append(queue, m) | |
case <-b: | |
for _, v := range queue { | |
o <- v | |
} | |
queue = make([]interface{}, 0) | |
} | |
} | |
}() | |
return o | |
} | |
// alternating split returns two streams, each one getting every other message from a | |
func alternatingSplit(a chan interface{}) (chan interface{}, chan interface{}) { | |
o1 := make(chan interface{}) | |
o2 := make(chan interface{}) | |
go func() { | |
i := true | |
for m := range a { | |
if i { | |
o1 <- m | |
} else { | |
o2 <- m | |
} | |
i = !i | |
} | |
}() | |
return o1, o2 | |
} | |
func zip(a, b chan interface{}) chan interface{} { | |
o := make(chan interface{}) | |
go func() { | |
for { | |
m := <-a | |
n := <-b | |
o <- m | |
o <- n | |
} | |
}() | |
return o | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment