Skip to content

Instantly share code, notes, and snippets.

@brianpursley
Last active September 22, 2020 18:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianpursley/9684f3a04cf4c4a9d0f7c3aaa3913d08 to your computer and use it in GitHub Desktop.
Save brianpursley/9684f3a04cf4c4a9d0f7c3aaa3913d08 to your computer and use it in GitHub Desktop.
Example of using go routines as async producer/consumer workers communicating over a channel
package main
import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
const (
producerCount = 4
consumerCount = 8
producerFrequency = 1000 * time.Millisecond
consumerSimulatedWorkDuration = 2000 * time.Millisecond
messageChannelSize = 8
timestampFormat = "2006-01-02 15:04:05.000"
)
func main() {
// Channel for producers to communicate with consumers
messages := make(chan string, messageChannelSize)
// Start producer workers
isProducing := true
producerGroup := sync.WaitGroup{}
producerGroup.Add(producerCount)
for i := 0; i < producerCount; i++ {
go func(id int) {
defer producerGroup.Done()
fmt.Printf("Producer %d started\n", id)
num := 0
for isProducing {
time.Sleep(producerFrequency)
m := fmt.Sprintf("%d:%d:%s", id, num, time.Now().Format(timestampFormat))
func() {
for isProducing {
select {
case messages <- m:
fmt.Printf("Producer %d sent message @ %s, channel length: %d --> %s\n", id, time.Now().Format(timestampFormat), len(messages), m)
return
default:
fmt.Println("Channel full, waiting...")
time.Sleep(100 * time.Millisecond)
}
}
}()
num++
}
fmt.Printf("Producer %d stopped\n", id)
}(i)
}
// Start consumer workers
consumerGroup := sync.WaitGroup{}
consumerGroup.Add(consumerCount)
for i := 0; i < consumerCount; i++ {
go func(id int) {
defer consumerGroup.Done()
fmt.Printf("Consumer %d started\n", id)
for m := range messages {
fmt.Printf("Consumer %d received message @ %s, channel length: %d --> %s\n", id, time.Now().Format(timestampFormat), len(messages), m)
time.Sleep(consumerSimulatedWorkDuration)
}
fmt.Printf("Consumer %d stopped\n", id)
}(i)
}
// Wait for Ctrl+C
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
// Stop producers
fmt.Println("Stopping producers")
isProducing = false
producerGroup.Wait()
fmt.Println("Stopped producers")
// Stop consumers (closing the channel will allow them to process any pending messages before they exit)
fmt.Println("Stopping consumers")
close(messages)
consumerGroup.Wait()
fmt.Println("Stopped consumers")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment