Skip to content

Instantly share code, notes, and snippets.

@asw456
Forked from mikedewar/streamgo.go
Created August 1, 2014 04:02
Show Gist options
  • Save asw456/3f254031900c7e3fac5a to your computer and use it in GitHub Desktop.
Save asw456/3f254031900c7e3fac5a to your computer and use it in GitHub Desktop.
package main
import (
"math/rand"
)
// mux takes two streams and turns them into one stream
func merge(a, b chan []byte) chan []byte {
o := make(chan []byte)
go func() {
for {
select {
case m := <-a:
o <- m
case m := <-b:
o <- m
}
}
}()
return o
}
// regulate emits a message from a only upon recieving a message from b
func regulate(a chan []byte, b chan []byte) chan []byte {
o := make(chan []byte)
go func() {
for {
<-b
select {
case n := <-a:
o <- n
default:
continue
}
}
}()
return o
}
// accumulate emits all the messages that have arrived since the last message from b
func accumulate(a chan []byte, b chan []byte) chan []byte {
o := make(chan []byte)
go func() {
queue := make([][]byte, 0)
for {
select {
case m := <-a:
queue = append(queue, m)
case <-b:
for _, v := range queue {
o <- v
}
queue = make([][]byte, 0)
}
}
}()
return o
}
// sample flips a coin to decide whether or not to emit or reject each message on a
func sample(a chan []byte) chan []byte {
o := make(chan []byte)
go func() {
for {
if rand.Float64() > 0.5 {
m := <-a
o <- m
} else {
<-a
}
}
}()
return o
}
// alternating split returns two streams, each one getting every other message from a
func alternatingSplit(a chan []byte) (chan []byte, chan []byte) {
o1 := make(chan []byte)
o2 := make(chan []byte)
go func() {
i := true
for m := range a {
if i {
o1 <- m
} else {
o2 <- m
}
i = !i
}
}()
return o1, o2
}
func main() {
a := make(chan []byte)
b := make(chan []byte)
c := merge(a, b)
alternatingSplit(sample(regulate(c, a)))
accumulate(b, a)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment