Skip to content

Instantly share code, notes, and snippets.

@abitofhelp
Last active July 28, 2018 02:21
Show Gist options
  • Save abitofhelp/0af1ac31c6a5c18987d630c82838e47d to your computer and use it in GitHub Desktop.
Save abitofhelp/0af1ac31c6a5c18987d630c82838e47d to your computer and use it in GitHub Desktop.
This gist shows the use of a buffered channel with multiple senders and receivers.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Package main is the entry point for the application
// and is responsible for configuring the environment.
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
const (
// Constant kMaxSenders is the number of goroutines that will be sending
// values into the channel in parallel.
kMaxSenders = 25
// Constant kMaxReceivers is the number of goroutines that will be receiving
// values from the channel in parallel.
kMaxReceivers = 100
// Constant kMaxTransmissionsPerSender is the number of items that will be
// sent into the channel by each sending goroutine.
kMaxTransmissionsPerSender = 1000
)
// Function main is the entry point for the application and is responsible
// for configuring its environment.
func main() {
// Variable wg is main's WaitGroup, which detects when all of the
// goroutines that were launched have completed.
var wg sync.WaitGroup
// Start our timer...
start := time.Now()
// Doing something...
doIt(&wg)
// Wait here until all goroutines have completed their work.
wg.Wait()
// Show the duration.
fmt.Printf("Elapsed: %s", time.Since(start))
}
// Function receiver uses a for loop over a range to receive a value from the channel.
// Parameter ch is a unidirectional channel for reading integer values.
// Parameter id is the unique identifier assigned to each receiving goroutine.
// Parameter counter is an atomic count of the number of messages that were received
// through the channel.
func receiver(id uint64, ch <-chan uint64, counter *uint64) {
for val := range ch {
// Increment the received counter, atomically.
cnt := atomic.AddUint64(counter, 1)
fmt.Println("(", cnt, ") Receiver(", id, "), value: ", val)
}
}
// Function sender places 1000 integers into a unidirectional channel for sending integer
// values.
// Parameter ch is a unidirectional channel for reading integer values.
// Parameter id is the unique identifier assigned to each sending goroutine.
// Parameter counter is an atomic count of the number of messages that were sent
// through the channel.
func sender(id uint64, ch chan<- uint64, counter *uint64) {
for i := uint64(0); i < kMaxTransmissionsPerSender; i++ {
// Generate a random value to broadcast to the receivers.
val := rand.Uint64()
// Get the current value of the counter, atomically, and show a status message.
cnt := atomic.LoadUint64(counter)
fmt.Println("(", cnt, ") Sender(", id, "), broadcasting: ", val)
// Send the value through the channel.
ch <- val
// Increment the sent counter, atomically.
atomic.AddUint64(counter, 1)
}
}
// Function doIt does the work. It creates the channel,
// launches the goroutines and returns to main().
// Please note that a single sender is broadcasting values to
// two receivers over the channel.
func doIt(wg *sync.WaitGroup) {
// Atomic counter of the number of items sent.
var sentCounter uint64 = 1
// Atomic counter of the number of items received.
var receivedCounter uint64 = 0
// Variable wgs is a WaitGroup so we know when the senders have completed.
var wgs sync.WaitGroup
// Create the bidirectional channel for communications between
// the goroutines.
ch := make(chan uint64, 100)
// Launch multiple receivers and set up the WaitGroup to account for them.
for r := uint64(0); r < kMaxReceivers; r++ {
wg.Add(1)
go func(id uint64) {
defer wg.Done()
receiver(id, ch, &receivedCounter)
}(r)
}
// Launch multiple senders and set up the WaitGroup to account for them.
for s := uint64(0); s < kMaxSenders; s++ {
wgs.Add(1)
go func(id uint64) {
defer wgs.Done()
sender(id, ch, &sentCounter)
}(s)
}
// Wait here until all sending goroutines have completed their work.
wgs.Wait()
// Close the channel there are no more values to send...
// It signals the receivers that there are no more values,
// so when the channel is empty, they can return and the application
// can terminate.
close(ch)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment