Skip to content

Instantly share code, notes, and snippets.

@mikedewar
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mikedewar/6dce654c8503b404595b to your computer and use it in GitHub Desktop.
Save mikedewar/6dce654c8503b404595b to your computer and use it in GitHub Desktop.
a stream processing library in go, where every function only accepts channels as inputs and only returns channels as outputs
package main
// merge takes two streams and turns them into one stream
func merge(a, b chan interface{}) chan interface{} {
o := make(chan interface{})
go func() {
for {
select {
case m := <-a:
o <- m
case m := <-b:
o <- m
}
}
}()
return o
}
// regulate emits a message from a only upon recieving a message from b
func regulate(a chan interface{}, b chan interface{}) chan interface{} {
o := make(chan interface{})
go func() {
for {
<-b
select {
case n := <-a:
o <- n
default:
continue
}
}
}()
return o
}
// accumulate emits all the messages that have arrived since the last message from b
func accumulate(a chan interface{}, b chan interface{}) chan interface{} {
o := make(chan interface{})
go func() {
queue := make([]interface{}, 0)
for {
select {
case m := <-a:
queue = append(queue, m)
case <-b:
for _, v := range queue {
o <- v
}
queue = make([]interface{}, 0)
}
}
}()
return o
}
// alternating split returns two streams, each one getting every other message from a
func alternatingSplit(a chan interface{}) (chan interface{}, chan interface{}) {
o1 := make(chan interface{})
o2 := make(chan interface{})
go func() {
i := true
for m := range a {
if i {
o1 <- m
} else {
o2 <- m
}
i = !i
}
}()
return o1, o2
}
func zip(a, b chan interface{}) chan interface{} {
o := make(chan interface{})
go func() {
for {
m := <-a
n := <-b
o <- m
o <- n
}
}()
return o
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment