Skip to content

Instantly share code, notes, and snippets.

@CheyiLin
Created October 13, 2023 03:21
Show Gist options
  • Save CheyiLin/819747c9a6f8bd7c6286abe0ffab63c1 to your computer and use it in GitHub Desktop.
Save CheyiLin/819747c9a6f8bd7c6286abe0ffab63c1 to your computer and use it in GitHub Desktop.
Producer and consumer workers with rate limit
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
const (
jobCount = 20
ratePerSecond = 4
workerCount = 4
mainTimeout = time.Second * 3
)
var tb = time.Now()
func elapsed() string {
return fmt.Sprintf("%05f", time.Since(tb).Seconds())
}
func worker(ctx context.Context, id int, ch <-chan int, limiter *rate.Limiter, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
// context canceled
return
case v, ok := <-ch:
if !ok {
// channel closed
return
}
// Wait returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
err := limiter.Wait(ctx)
if err != nil {
fmt.Printf("[job %02d] %v\n", id, err)
return
}
fmt.Printf("[job %02d] value %02d @ %ss\n", id, v, elapsed())
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), mainTimeout)
defer cancel()
jobCh := make(chan int)
limiter := rate.NewLimiter(ratePerSecond, workerCount)
wg := sync.WaitGroup{}
for i := 0; i < workerCount; i++ {
wg.Add(1)
go worker(ctx, i, jobCh, limiter, &wg)
}
produce:
for j := 0; j < jobCount; j++ {
select {
case <-ctx.Done():
fmt.Printf("[main] %s\n", ctx.Err())
break produce
case jobCh <- j:
}
}
close(jobCh)
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment