Skip to content

Instantly share code, notes, and snippets.

@manadart
Last active August 29, 2015 14:06
Show Gist options
  • Save manadart/5e6431122ee54bbabb92 to your computer and use it in GitHub Desktop.
Save manadart/5e6431122ee54bbabb92 to your computer and use it in GitHub Desktop.
Go Concurrent Slice Blog Post
// Payload type extending a slice of items.
type Payload []*itemType
// Use a pool of Goroutines to run the input action against all the items, with the input concurrency factor.
func (payload Payload) ExecuteAction(action func(*itemType), concFactor int) {
itemsChl := make(chan *itemType)
wg := new(sync.WaitGroup)
wg.Add(concFactor)
// Set up the pool with a number of Goroutines equal to our concurrency factor.
wrapped := ItemActionWrapper(action)
for i := 0; i < concFactor; i++ {
go wrapped(itemsChl, wg)
}
// Send the items.
for _, i := range payload {
itemsChl <- i
}
// Close the channel and wait for the workers to finish.
close(itemsChl)
wg.Wait()
}
// Executes the wrapped action while the channel is open, then notifies the WaitGroup of completion.
func ItemActionWrapper(action func(*itemType)) func(chan *itemType, *sync.WaitGroup) {
return func(itemsChl chan *itemType, wg *sync.WaitGroup) {
defer wg.Done()
for i := range itemsChl {
action(i)
}
}
}
func someItemAction(item *itemType) {
// Do something.
}
func anotherItemAction(item *itemType) {
// Do another thing.
}
func main() {
var payload Payload
// Populate somehow.
payload.ExecuteAction(someItemAction, 8)
payload.ExecuteAction(anotherItemAction, 16)
// Do something else with the payload.
}
func worker(itemsChl chan *itemType, wg *sync.WaitGroup) {
defer wg.Done()
for item := range itemsChl {
// Do something with item.
}
}
func main() {
sliceOfItems := make(*itemType, 100)
// Populate the slice somehow.
itemsChl := make(chan *itemType)
wg := new(sync.WaitGroup)
// Create the pool of 8 workers and set them watching the channel.
for i := 0; i < 8; i++ {
wg.Add(1)
go worker(itemsChl, wg)
}
// Send all the items down the channel.
for _, item := range sliceOfItems {
itemsChl <- item
}
// Close the channel and wait for the workers to finish.
close(itemsChl)
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment