Skip to content

Instantly share code, notes, and snippets.

@nikhan
Last active August 29, 2015 14:04
Show Gist options
  • Save nikhan/827386c05ef1228e7b24 to your computer and use it in GitHub Desktop.
Save nikhan/827386c05ef1228e7b24 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"math/rand"
"time"
)
type Stream chan interface{}
type StreamMap func(interface{}) interface{}
type StreamFilter func(interface{}) bool
func (s Stream) Tick(S string) Stream {
b := make(Stream)
go func(durs string){
dur, _ := time.ParseDuration(durs)
a := time.NewTicker(dur)
for{
select {
case <-a.C:
b <- "tick"
}
}
}(S)
return b
}
func (s Stream) Map(a StreamMap) Stream{
c := make(Stream)
go func(){
for{
c <- a(<-s)
}
}()
return c
}
func (s Stream) Filter(a StreamFilter) Stream {
c := make(Stream)
go func(){
for{
m := <-s
if a(m){
c <- m
}
}
}()
return c
}
func (s Stream) Log() Stream{
go func(){
for{
fmt.Println(<-s)
}
}()
return s
}
func (s Stream) Zip(b ...Stream) Stream{
c := make(Stream)
go func(x []Stream){
z := make([]Stream, len(x) + 1, len(x) + 1)
z[0] = s
for i, _ := range x{
z[i + 1] = x[i]
}
for{
for _, v := range z {
m := <-v
c <- m
}
}
}(b)
return c
}
func (s Stream) merge(b Stream) Stream{
c := make(Stream)
go func(){
for{
select{
case m := <-b:
c <- m
case m := <-s:
c <- m
}
}
}()
return c
}
func (s Stream) Merge(b ...Stream) Stream {
c := make(Stream)
for _, v := range b{
s = s.merge(v)
}
go func(){
for{
m := <- s
c <- m
}
}()
return c
}
func main() {
streamA := func(a interface{}) interface{}{
return "a"
}
streamB := func(a interface{}) interface{}{
return "b"
}
streamC := func(a interface{}) interface{}{
return "c"
}
streamD := func(a interface{}) interface{}{
return "d"
}
streamE := func(a interface{}) bool{
if rand.Float64() > .5 {
return true
}
return false
}
a := make(Stream).Tick("200ms").Map(streamA)
b := make(Stream).Tick("300ms").Map(streamB)
c := make(Stream).Tick("100ms").Map(streamD)
e := make(Stream).Tick("200ms").Filter(streamE)
make(Stream).Tick("1s").Map(streamC).Zip(a,b).Merge(c,e).Log()
<-make(chan bool)
}
@nikhan
Copy link
Author

nikhan commented Aug 20, 2014

💩

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment