Skip to content

Instantly share code, notes, and snippets.

@apg
Created November 6, 2015 16:43
Show Gist options
  • Save apg/ade83a5823cdff76cbc3 to your computer and use it in GitHub Desktop.
Save apg/ade83a5823cdff76cbc3 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"sync"
)
type Event map[string]interface{}
type asyncStreamFunc func(done <-chan struct{}, in <-chan Event, out chan<- Event)
var multiples = map[interface{}]int{
"giraffe": 1,
"lion": 10000,
}
// Partitions the incoming stream by "field" and passes to streamFunc
func by(field string, fn asyncStreamFunc, done <-chan struct{}, in <-chan Event) <-chan Event {
var wg sync.WaitGroup
defer wg.Wait()
retout := make(chan Event)
partitioned := make(map[interface{}]chan Event)
output := func(in chan Event) {
defer wg.Done()
defer close(in)
fn(done, in, retout)
}
go func() {
defer close(retout)
for e := range in {
// do we have a partition yet for this?
foo := e[field]
out, ok := partitioned[foo]
if !ok {
out = make(chan Event)
partitioned[foo] = out
wg.Add(1)
go output(out)
println("spawned an asq")
}
select {
case out <- e:
case <-done:
}
}
}()
return retout
}
// Async channels don't close their output channel, as it's shared.
func asq(done <-chan struct{}, in <-chan Event, out chan<- Event) {
println("Reading on in")
for e := range in {
if ni, ok := e["value"].(int); ok {
ne := copyEvent(e)
ne["value"] = ni * multiples[e["type"]]
select {
case out <- ne:
case <-done:
return
}
}
}
}
func gen(n int) <-chan Event {
out := make(chan Event, n)
for i := 0; i < n; i++ {
e := make(Event)
e["value"] = i
if i%2 == 0 {
e["type"] = "giraffe"
} else {
e["type"] = "lion"
}
out <- e
}
close(out)
return out
}
func copyEvent(e Event) Event {
e2 := make(Event)
for k, v := range e {
e2[k] = v
}
return e2
}
func main() {
done := make(chan struct{})
defer close(done)
gg := gen(1000)
out := by("type", asq, done, gg)
for n := range out {
fmt.Printf("%+v\n", n)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment