Skip to content

Instantly share code, notes, and snippets.

@rohanthewiz
Last active April 12, 2017 22:06
Show Gist options
  • Save rohanthewiz/73f68f6af94f88843ac2bc43317d2448 to your computer and use it in GitHub Desktop.
Save rohanthewiz/73f68f6af94f88843ac2bc43317d2448 to your computer and use it in GitHub Desktop.
A Pattern for concurrency using Wait Groups and Buffered Channels
// A Concurrency Pattern using Wait Groups and Buffered Channels
package main
import (
"time"
"fmt"
"sync"
)
// Synopsis
// We create a buffered channel that will receive the basic unit of processing. Here we use a slice (array in other langs) of strings
// An unbuffered "done" channel is used to signal when poller processing complete
var process_strings_channel chan []string
const processChannelSize = 2000
var generate_strings_wait_group = new(sync.WaitGroup)
func main() {
process_strings_channel = make(chan []string, processChannelSize)
process_strings_done := make(chan bool)
// Start the listener to the process_strings_channel
go pollForStrings(process_strings_channel, process_strings_done)
// Send some slices
for i := 0; i < 5; i++ {
generate_strings_wait_group.Add(1) // basically increment the number of outstanding generate_string goroutines
go generateStrings(i)
}
// Wait for generateStrings goroutines to finish (number of outstanding goroutines is 0)
generate_strings_wait_group.Wait()
// Close the channel so nothing else can be added and the consumer (poller) knows the end of the line
close(process_strings_channel)
// Wait for the consumer to send back on the unbuffered 'done' channel (process_strings_done) when it has completed *all* processing
<- process_strings_done // wait for final stage to complete
}
func generateStrings(gr_num int) {
gr := fmt.Sprintf("GR:%d", gr_num)
arr_str := []string{
gr, "cat", "dog", "mouse",
}
// Send to the channel
// This will block if the channel is full i.e. at processChannelSize capacity
fmt.Println(gr, "sending...")
process_strings_channel <- arr_str // send to the channel.
generate_strings_wait_group.Done() // decrement the number of outstanding generate_string goroutines
}
// Poll the process_strings_channel for incoming messages of []string
// We pass in input and signal channels so this function could live in a separate package
func pollForStrings(process_strings_channel <-chan []string, done chan <- bool) {
defer func() {
done <- true // signal caller when we are done
}()
for {
select { // Select can multiplex cases reading from multiple channels
case attrs, ok := <- process_strings_channel: // we will block till there is a message on the channel
if !ok { // the channel is closed *and* empty
// wrap up
return // the defer will send `true` to the done channel
} else { // do some work on our attributes
fmt.Println("Poller received new array of strings")
for _, attr := range attrs {
fmt.Println(attr)
}
}
case <- time.After(10 * time.Second): // After 10 seconds we will receive from the channel returned by time.After
fmt.Println("Timeout waiting for messages")
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment