Instantly share code, notes, and snippets.

Embed
What would you like to do?
Aggregating reads from a go channel
package main
import (
"fmt"
"strings"
"time"
)
var (
count uint64
msgs chan string
done chan bool
printed []string
pout bool
)
//===========================================================================
// Consumer Methods
//===========================================================================
// consume messages one at a time without aggregation
func consumeEach() {
for {
select {
case s := <-msgs:
handle(s, 1)
case <-done:
return
}
}
}
// consume while aggregating similar messages but does not aggregate the
// next message even if it is possible to.
func consumeAggregate() {
var current, next string
var icount int
for {
select {
case current = <-msgs:
// we now have one count of current
icount = 1
grouper:
// continue performing non blocking reads until
for {
select {
case next = <-msgs:
if next != current {
// we got a different message next, handle both
// note that next will not be grouped upon however
break grouper
} else {
// we got a duplicate message, increment icount and continue
icount++
}
default:
// no message pending on the channel, break but don't handle next
next = ""
break grouper
}
}
case <-done:
// done consuming
return
}
// handle all the current messages
handle(current, icount)
// handle next if one exists
if next != "" {
handle(next, 1)
}
}
}
// aggregates similar messages including the next message if possible
func consmeAggregateNext() {
var current, next string
icount := 1
grouper:
for {
select {
case next = <-msgs:
if next == current && current != "" {
// Found another member of the group, continue the for loop to see if there is another
icount++
continue grouper
} else {
current = next
icount = 1
continue grouper
}
case <-done:
return
default:
next = ""
}
// executes if the for loop has not been continued
if current != "" {
handle(current, icount)
current = next
icount = 1
}
}
}
//===========================================================================
// Main Method
//===========================================================================
func main() {
// Create channels to listen on
msgs = make(chan string, 100)
done = make(chan bool, 1)
// Print out rather than buffer
pout = true
// Create workers to send messages
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("b", msgs, 10, time.Millisecond*40)
go dynamo("b", msgs, 10, time.Millisecond*40)
go dynamo("c", msgs, 5, time.Millisecond*80)
// Close the messages channel eventually
time.AfterFunc(time.Second*1, func() { done <- true })
// Consume one at a time as they come in.
// consumeEach()
// Aggregate messages on the stream with non-blocking reads
consumeAggregate()
// Use intermediate channel for aggregation
// consmeChanAggregator()
fmt.Printf("%d messages read of 50 sent\n", count)
}
//===========================================================================
// Helper Functions
//===========================================================================
// dynamo sends N messages (char) to the channel with specified delay
func dynamo(char string, msgs chan<- string, N int, delay time.Duration) {
for i := 0; i < N; i++ {
msgs <- char
time.Sleep(delay)
}
}
// handle messages and aggregations of messages
func handle(msg string, num int) {
if num > 1 {
msg = strings.Repeat(msg, num)
}
count += uint64(num) // count the number of messages handled
if pout {
fmt.Println(msg) // print the groups of messages
} else {
printed = append(printed, msg) // store what we've printed for testing
}
}
package main
import (
"testing"
"time"
)
// sets up the test state
func setup() {
msgs = make(chan string, 100)
done = make(chan bool, 1)
printed = make([]string, 0, 50)
count = 0
}
// runs the test workers
func runWorkers() {
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("a", msgs, 5, time.Millisecond*5)
go dynamo("b", msgs, 10, time.Millisecond*40)
go dynamo("b", msgs, 10, time.Millisecond*40)
go dynamo("c", msgs, 5, time.Millisecond*80)
// Close the messages channel eventually
time.AfterFunc(time.Second*1, func() { done <- true })
}
// checks the output of all tests to ensure correct execution
func checkOutput(t *testing.T) {
if count != 50 {
t.Errorf("only recieved %d of 50 messages", count)
}
aggregations := 0
letters := make(map[rune]int)
for _, s := range printed {
if len(s) > 1 {
aggregations++
}
fchar := rune(s[0])
for _, c := range s {
if c != fchar {
t.Errorf("incorrect aggregation: '%s'", s)
}
letters[c]++
}
}
if letters['a'] != 25 {
t.Errorf("saw %d a events of 25", letters['a'])
}
if letters['b'] != 20 {
t.Errorf("saw %d b events of 20", letters['b'])
}
if letters['c'] != 5 {
t.Errorf("saw %d c events of 5", letters['c'])
}
t.Logf("discovered %d aggregations", aggregations)
}
func TestConsumeEach(t *testing.T) {
// sets up the test state
setup()
// Create workers to send messages
runWorkers()
// Consume one at a time as they come in.
consumeEach()
// Make sure that consume each worked correctly
checkOutput(t)
}
func TestConsumeAggregate(t *testing.T) {
// sets up the test state
setup()
// Create workers to send messages
runWorkers()
// Consume aggregates without aggregating next.
consumeAggregate()
// Make sure that consume each worked correctly
checkOutput(t)
}
func TestConsmeAggregateNext(t *testing.T) {
// sets up the test state
setup()
// Create workers to send messages
runWorkers()
// Consume aggregates while aggregating next.
consmeAggregateNext()
// Make sure that consume each worked correctly
checkOutput(t)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment