Skip to content

Instantly share code, notes, and snippets.

@apparentlymart
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apparentlymart/78ad7dcfc3fa7073f150 to your computer and use it in GitHub Desktop.
Save apparentlymart/78ad7dcfc3fa7073f150 to your computer and use it in GitHub Desktop.
Channel item aggregation in Go
// Example of aggregating channel values into sets based on a key.
// In this case, we aggregate strings by their first letter.
// Aggregated groups are emitted as slices when one of the following is true:
// - the buffer capacity is reached (set to 2 in this demo so we can hit it easily with the test data)
// - no new items have shown up for that group for two seconds
// - the source channel is closed
//
// The primary use case for this sort of thing is in forming higher-level transactions from an event stream,
// such as aggregating a log of actions by userid into a set of events that represents a session.
package main
import "fmt"
import "time"
import "container/list"
type Aggregate struct {
Items []string
FlushTime int64
}
func aggregate(in chan string, out chan<- []string) {
emitQueue := list.New()
queueElems := make(map[uint8]*list.Element)
flushTimer := time.NewTimer(1)
flushTimer.Stop() // We'll start it when we know we need it.
closed := false
for {
select {
case item, stillOpen := <-in:
if !stillOpen {
closed = true
break
}
elem, ok := queueElems[item[0]]
if !ok {
aggregate := new(Aggregate)
aggregate.Items = make([]string, 0, 2)
elem = emitQueue.PushFront(aggregate)
queueElems[item[0]] = elem
}
aggregate := elem.Value.(*Aggregate)
aggregate.Items = append(aggregate.Items, item)
if len(aggregate.Items) == cap(aggregate.Items) {
// aggregate is full so we emit it prematurely
out <- aggregate.Items
delete(queueElems, item[0])
emitQueue.Remove(elem)
}
emitQueue.MoveToFront(elem)
aggregate.FlushTime = time.Now().Unix() + 2
case <-flushTimer.C:
}
if closed {
break
}
// Set a timer to wake us up when it's time to flush the oldest
// aggregate in our queue. We flush aggregates after two seconds
// of inactivity
oldestElement := emitQueue.Back()
now := time.Now().Unix()
for {
if oldestElement == nil {
break
}
aggregate := oldestElement.Value.(*Aggregate)
if aggregate.FlushTime <= now {
out <- aggregate.Items
delete(queueElems, aggregate.Items[0][0])
emitQueue.Remove(oldestElement)
oldestElement = oldestElement.Next()
} else {
break
}
}
if oldestElement != nil {
aggregate := oldestElement.Value.(*Aggregate)
timeLeft := aggregate.FlushTime - time.Now().Unix()
if timeLeft < 0 {
timeLeft = 0
}
flushTimer.Reset(time.Duration(timeLeft) * time.Second)
} else {
flushTimer.Stop()
}
}
for _, elem := range queueElems {
aggregate := elem.Value.(*Aggregate)
out <- aggregate.Items
}
close(out)
}
func main() {
source := make(chan string)
target := make(chan []string)
done := make(chan bool)
go aggregate(source, target)
go func() {
for items := range target {
fmt.Println(items)
}
done <- true
}()
fmt.Println("begin")
source <- "apple"
time.Sleep(1 * time.Second)
source <- "zebra"
time.Sleep(1 * time.Second)
source <- "anguish"
time.Sleep(1 * time.Second)
source <- "albatross"
time.Sleep(1 * time.Second)
source <- "window"
time.Sleep(1 * time.Second)
source <- "wobble"
time.Sleep(5 * time.Second)
source <- "anglophile"
close(source)
<-done
fmt.Println("the end")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment