Skip to content

Instantly share code, notes, and snippets.

@smithjessk
Last active November 2, 2016 20:48
Show Gist options
  • Save smithjessk/b02ae1f0c5fbefdd26b1778f62aeda27 to your computer and use it in GitHub Desktop.
Save smithjessk/b02ae1f0c5fbefdd26b1778f62aeda27 to your computer and use it in GitHub Desktop.
Channel-based event aggregator
package main
import (
"fmt"
"time"
"github.com/gocraft/health"
"github.com/gorilla/websocket"
)
// Receiver keeps track of events (receives them from the outside world)
type disperser struct {
stream *health.Stream
eventInput chan int // send events here
aggregatorInput chan chan []int // send "aggregate" calls here
cancellation chan bool // to bring everything down, send "true"
events []int // keeps track of events
listeners []*websocket.Conn
}
func newDisperser() *disperser {
return &disperser{
health.NewStream(),
make(chan int, 10000),
make(chan chan []int),
make(chan bool, 1),
[]int{},
[]*websocket.Conn{},
}
}
func (d *disperser) addListener(conn *websocket.Conn) {
d.listeners = append(d.listeners, conn)
}
// Either does nothing, adds a new event to the current list of events, or
// sends Aggregator the slice of events.
func (d *disperser) listen() {
for true {
select {
case output := <-d.aggregatorInput:
old := d.events
d.events = []int{}
output <- old
case event := <-d.eventInput:
d.events = append(d.events, event)
// fmt.Printf("received event %d\n", event)
default:
// No message! do nothing
}
}
}
func main() {
d := newDisperser()
defer func() {
ce := make(chan []int)
d.aggregatorInput <- ce
fmt.Printf("got %d events, homie\n", len(<-ce))
}()
go d.listen()
go d.getMessages()
go func() {
for i := 0; i < 500000; i++ {
d.eventInput <- i
}
time.Sleep(10 * time.Second)
d.cancellation <- true
}()
<-d.cancellation
fmt.Println("Received from d.cancellation, so dyin'")
}
// Every second, aggregates all the events for the last second.
func (d *disperser) getMessages() {
for true {
time.Sleep(time.Second)
ce := make(chan []int)
d.aggregatorInput <- ce
fmt.Printf("got %d events, homie\n", len(<-ce))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment