Last active
August 29, 2015 14:05
-
-
Save apparentlymart/78ad7dcfc3fa7073f150 to your computer and use it in GitHub Desktop.
Channel item aggregation in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example of aggregating channel values into sets based on a key. | |
// In this case, we aggregate strings by their first letter. | |
// Aggregated groups are emitted as slices when one of the following is true: | |
// - the buffer capacity is reached (set to 2 in this demo so we can hit it easily with the test data) | |
// - no new items have shown up for that group for two seconds | |
// - the source channel is closed | |
// | |
// The primary use case for this sort of thing is in forming higher-level transactions from an event stream, | |
// such as aggregating a log of actions by userid into a set of events that represents a session. | |
package main | |
import "fmt" | |
import "time" | |
import "container/list" | |
type Aggregate struct { | |
Items []string | |
FlushTime int64 | |
} | |
func aggregate(in chan string, out chan<- []string) { | |
emitQueue := list.New() | |
queueElems := make(map[uint8]*list.Element) | |
flushTimer := time.NewTimer(1) | |
flushTimer.Stop() // We'll start it when we know we need it. | |
closed := false | |
for { | |
select { | |
case item, stillOpen := <-in: | |
if !stillOpen { | |
closed = true | |
break | |
} | |
elem, ok := queueElems[item[0]] | |
if !ok { | |
aggregate := new(Aggregate) | |
aggregate.Items = make([]string, 0, 2) | |
elem = emitQueue.PushFront(aggregate) | |
queueElems[item[0]] = elem | |
} | |
aggregate := elem.Value.(*Aggregate) | |
aggregate.Items = append(aggregate.Items, item) | |
if len(aggregate.Items) == cap(aggregate.Items) { | |
// aggregate is full so we emit it prematurely | |
out <- aggregate.Items | |
delete(queueElems, item[0]) | |
emitQueue.Remove(elem) | |
} | |
emitQueue.MoveToFront(elem) | |
aggregate.FlushTime = time.Now().Unix() + 2 | |
case <-flushTimer.C: | |
} | |
if closed { | |
break | |
} | |
// Set a timer to wake us up when it's time to flush the oldest | |
// aggregate in our queue. We flush aggregates after two seconds | |
// of inactivity | |
oldestElement := emitQueue.Back() | |
now := time.Now().Unix() | |
for { | |
if oldestElement == nil { | |
break | |
} | |
aggregate := oldestElement.Value.(*Aggregate) | |
if aggregate.FlushTime <= now { | |
out <- aggregate.Items | |
delete(queueElems, aggregate.Items[0][0]) | |
emitQueue.Remove(oldestElement) | |
oldestElement = oldestElement.Next() | |
} else { | |
break | |
} | |
} | |
if oldestElement != nil { | |
aggregate := oldestElement.Value.(*Aggregate) | |
timeLeft := aggregate.FlushTime - time.Now().Unix() | |
if timeLeft < 0 { | |
timeLeft = 0 | |
} | |
flushTimer.Reset(time.Duration(timeLeft) * time.Second) | |
} else { | |
flushTimer.Stop() | |
} | |
} | |
for _, elem := range queueElems { | |
aggregate := elem.Value.(*Aggregate) | |
out <- aggregate.Items | |
} | |
close(out) | |
} | |
func main() { | |
source := make(chan string) | |
target := make(chan []string) | |
done := make(chan bool) | |
go aggregate(source, target) | |
go func() { | |
for items := range target { | |
fmt.Println(items) | |
} | |
done <- true | |
}() | |
fmt.Println("begin") | |
source <- "apple" | |
time.Sleep(1 * time.Second) | |
source <- "zebra" | |
time.Sleep(1 * time.Second) | |
source <- "anguish" | |
time.Sleep(1 * time.Second) | |
source <- "albatross" | |
time.Sleep(1 * time.Second) | |
source <- "window" | |
time.Sleep(1 * time.Second) | |
source <- "wobble" | |
time.Sleep(5 * time.Second) | |
source <- "anglophile" | |
close(source) | |
<-done | |
fmt.Println("the end") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment