Last active
November 2, 2016 20:48
-
-
Save smithjessk/b02ae1f0c5fbefdd26b1778f62aeda27 to your computer and use it in GitHub Desktop.
Channel-based event aggregator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
"github.com/gocraft/health" | |
"github.com/gorilla/websocket" | |
) | |
// Receiver keeps track of events (receives them from the outside world) | |
type disperser struct { | |
stream *health.Stream | |
eventInput chan int // send events here | |
aggregatorInput chan chan []int // send "aggregate" calls here | |
cancellation chan bool // to bring everything down, send "true" | |
events []int // keeps track of events | |
listeners []*websocket.Conn | |
} | |
func newDisperser() *disperser { | |
return &disperser{ | |
health.NewStream(), | |
make(chan int, 10000), | |
make(chan chan []int), | |
make(chan bool, 1), | |
[]int{}, | |
[]*websocket.Conn{}, | |
} | |
} | |
func (d *disperser) addListener(conn *websocket.Conn) { | |
d.listeners = append(d.listeners, conn) | |
} | |
// Either does nothing, adds a new event to the current list of events, or | |
// sends Aggregator the slice of events. | |
func (d *disperser) listen() { | |
for true { | |
select { | |
case output := <-d.aggregatorInput: | |
old := d.events | |
d.events = []int{} | |
output <- old | |
case event := <-d.eventInput: | |
d.events = append(d.events, event) | |
// fmt.Printf("received event %d\n", event) | |
default: | |
// No message! do nothing | |
} | |
} | |
} | |
func main() { | |
d := newDisperser() | |
defer func() { | |
ce := make(chan []int) | |
d.aggregatorInput <- ce | |
fmt.Printf("got %d events, homie\n", len(<-ce)) | |
}() | |
go d.listen() | |
go d.getMessages() | |
go func() { | |
for i := 0; i < 500000; i++ { | |
d.eventInput <- i | |
} | |
time.Sleep(10 * time.Second) | |
d.cancellation <- true | |
}() | |
<-d.cancellation | |
fmt.Println("Received from d.cancellation, so dyin'") | |
} | |
// Every second, aggregates all the events for the last second. | |
func (d *disperser) getMessages() { | |
for true { | |
time.Sleep(time.Second) | |
ce := make(chan []int) | |
d.aggregatorInput <- ce | |
fmt.Printf("got %d events, homie\n", len(<-ce)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment