Skip to content

Instantly share code, notes, and snippets.

@deckarep
Created September 6, 2017 04:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deckarep/6a6e44e145a7a5dcb6247531dace6e11 to your computer and use it in GitHub Desktop.
Save deckarep/6a6e44e145a7a5dcb6247531dace6e11 to your computer and use it in GitHub Desktop.
Debouncing Queues in Go
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync/atomic"
"time"
)
var (
amountProcessed int32
amountTossed int32
queue = make(chan string, 1)
)
func main() {
go startConsumer()
go startProducer()
wait := make(chan os.Signal, 1)
signal.Notify(wait, os.Interrupt)
<-wait
fmt.Println()
fmt.Println("Amount Tossed: ", atomic.LoadInt32(&amountTossed))
fmt.Println("Amount Processed: ", atomic.LoadInt32(&amountProcessed))
}
// This consumer pulls work off of a queue serially and only one task will ever
// be processed at a given time.
func startConsumer() {
for task := range queue {
timeConsumingWorker(task)
}
}
// This producer generates data at a rate significantly faster than the
// consumer can keep up.
func startProducer() {
for {
n := rand.Intn(100)
if n >= 99 {
select {
case queue <- fmt.Sprintf("task: %d", n):
atomic.AddInt32(&amountProcessed, 1)
default:
atomic.AddInt32(&amountTossed, 1)
}
}
// Don't burn cpu for demo.
time.Sleep(time.Microsecond * 10)
}
}
// This is our unit of work, something we know is slow and time consuming.
func timeConsumingWorker(task string) {
fmt.Println("Starting work...for task: ", task)
time.Sleep(time.Second * 1)
fmt.Println("Finishing work...")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment