Created
August 25, 2018 12:20
-
-
Save bbengfort/9b152a12a0291c5b5d403cbe6c8202ad to your computer and use it in GitHub Desktop.
Aggregating reads from a go channel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"strings" | |
"time" | |
) | |
var ( | |
count uint64 | |
msgs chan string | |
done chan bool | |
printed []string | |
pout bool | |
) | |
//=========================================================================== | |
// Consumer Methods | |
//=========================================================================== | |
// consume messages one at a time without aggregation | |
func consumeEach() { | |
for { | |
select { | |
case s := <-msgs: | |
handle(s, 1) | |
case <-done: | |
return | |
} | |
} | |
} | |
// consume while aggregating similar messages but does not aggregate the | |
// next message even if it is possible to. | |
func consumeAggregate() { | |
var current, next string | |
var icount int | |
for { | |
select { | |
case current = <-msgs: | |
// we now have one count of current | |
icount = 1 | |
grouper: | |
// continue performing non blocking reads until | |
for { | |
select { | |
case next = <-msgs: | |
if next != current { | |
// we got a different message next, handle both | |
// note that next will not be grouped upon however | |
break grouper | |
} else { | |
// we got a duplicate message, increment icount and continue | |
icount++ | |
} | |
default: | |
// no message pending on the channel, break but don't handle next | |
next = "" | |
break grouper | |
} | |
} | |
case <-done: | |
// done consuming | |
return | |
} | |
// handle all the current messages | |
handle(current, icount) | |
// handle next if one exists | |
if next != "" { | |
handle(next, 1) | |
} | |
} | |
} | |
// aggregates similar messages including the next message if possible | |
func consmeAggregateNext() { | |
var current, next string | |
icount := 1 | |
grouper: | |
for { | |
select { | |
case next = <-msgs: | |
if next == current && current != "" { | |
// Found another member of the group, continue the for loop to see if there is another | |
icount++ | |
continue grouper | |
} else { | |
current = next | |
icount = 1 | |
continue grouper | |
} | |
case <-done: | |
return | |
default: | |
next = "" | |
} | |
// executes if the for loop has not been continued | |
if current != "" { | |
handle(current, icount) | |
current = next | |
icount = 1 | |
} | |
} | |
} | |
//=========================================================================== | |
// Main Method | |
//=========================================================================== | |
func main() { | |
// Create channels to listen on | |
msgs = make(chan string, 100) | |
done = make(chan bool, 1) | |
// Print out rather than buffer | |
pout = true | |
// Create workers to send messages | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("b", msgs, 10, time.Millisecond*40) | |
go dynamo("b", msgs, 10, time.Millisecond*40) | |
go dynamo("c", msgs, 5, time.Millisecond*80) | |
// Close the messages channel eventually | |
time.AfterFunc(time.Second*1, func() { done <- true }) | |
// Consume one at a time as they come in. | |
// consumeEach() | |
// Aggregate messages on the stream with non-blocking reads | |
consumeAggregate() | |
// Use intermediate channel for aggregation | |
// consmeChanAggregator() | |
fmt.Printf("%d messages read of 50 sent\n", count) | |
} | |
//=========================================================================== | |
// Helper Functions | |
//=========================================================================== | |
// dynamo sends N messages (char) to the channel with specified delay | |
func dynamo(char string, msgs chan<- string, N int, delay time.Duration) { | |
for i := 0; i < N; i++ { | |
msgs <- char | |
time.Sleep(delay) | |
} | |
} | |
// handle messages and aggregations of messages | |
func handle(msg string, num int) { | |
if num > 1 { | |
msg = strings.Repeat(msg, num) | |
} | |
count += uint64(num) // count the number of messages handled | |
if pout { | |
fmt.Println(msg) // print the groups of messages | |
} else { | |
printed = append(printed, msg) // store what we've printed for testing | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"testing" | |
"time" | |
) | |
// sets up the test state | |
func setup() { | |
msgs = make(chan string, 100) | |
done = make(chan bool, 1) | |
printed = make([]string, 0, 50) | |
count = 0 | |
} | |
// runs the test workers | |
func runWorkers() { | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("a", msgs, 5, time.Millisecond*5) | |
go dynamo("b", msgs, 10, time.Millisecond*40) | |
go dynamo("b", msgs, 10, time.Millisecond*40) | |
go dynamo("c", msgs, 5, time.Millisecond*80) | |
// Close the messages channel eventually | |
time.AfterFunc(time.Second*1, func() { done <- true }) | |
} | |
// checks the output of all tests to ensure correct execution | |
func checkOutput(t *testing.T) { | |
if count != 50 { | |
t.Errorf("only recieved %d of 50 messages", count) | |
} | |
aggregations := 0 | |
letters := make(map[rune]int) | |
for _, s := range printed { | |
if len(s) > 1 { | |
aggregations++ | |
} | |
fchar := rune(s[0]) | |
for _, c := range s { | |
if c != fchar { | |
t.Errorf("incorrect aggregation: '%s'", s) | |
} | |
letters[c]++ | |
} | |
} | |
if letters['a'] != 25 { | |
t.Errorf("saw %d a events of 25", letters['a']) | |
} | |
if letters['b'] != 20 { | |
t.Errorf("saw %d b events of 20", letters['b']) | |
} | |
if letters['c'] != 5 { | |
t.Errorf("saw %d c events of 5", letters['c']) | |
} | |
t.Logf("discovered %d aggregations", aggregations) | |
} | |
func TestConsumeEach(t *testing.T) { | |
// sets up the test state | |
setup() | |
// Create workers to send messages | |
runWorkers() | |
// Consume one at a time as they come in. | |
consumeEach() | |
// Make sure that consume each worked correctly | |
checkOutput(t) | |
} | |
func TestConsumeAggregate(t *testing.T) { | |
// sets up the test state | |
setup() | |
// Create workers to send messages | |
runWorkers() | |
// Consume aggregates without aggregating next. | |
consumeAggregate() | |
// Make sure that consume each worked correctly | |
checkOutput(t) | |
} | |
func TestConsmeAggregateNext(t *testing.T) { | |
// sets up the test state | |
setup() | |
// Create workers to send messages | |
runWorkers() | |
// Consume aggregates while aggregating next. | |
consmeAggregateNext() | |
// Make sure that consume each worked correctly | |
checkOutput(t) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment