Skip to content

Instantly share code, notes, and snippets.

@pzentenoe
Forked from jerryan999/rate-limit-workerpool.go
Created December 14, 2022 12:15
Show Gist options
  • Save pzentenoe/3eee83ffd469aeb8a6c150d164b18790 to your computer and use it in GitHub Desktop.
Save pzentenoe/3eee83ffd469aeb8a6c150d164b18790 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"go.uber.org/ratelimit"
)
func Get(url string) ([]byte, int, error) {
resp, err := http.Get(url)
if err != nil {
return nil, resp.StatusCode, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, resp.StatusCode, err
}
return body, resp.StatusCode, nil
}
type Task struct {
seq int
url string
data []byte
err string
statusCode int
duration time.Duration
handleBy string
}
func (t Task) String() string {
return fmt.Sprintf("seq: %d, url: %s, handleBy: %s ,duration: %d, statusCode: %d, err: %s ...", t.seq, t.url, t.handleBy, t.duration, t.statusCode, t.err)
}
// worker is used to handle the task from the taskChan, and send the result to the resultsChan.
func worker(name string, taskChan <-chan Task, resultChan chan<- Task) {
for task := range taskChan {
start := time.Now()
body, code, err := Get(task.url)
if err != nil {
task.err = err.Error()
}
task.statusCode = code
task.data = body
task.handleBy = name
task.duration = time.Duration(time.Since(start).Milliseconds())
resultChan <- task
}
}
func producer(rl ratelimit.Limiter) <-chan Task {
var tasks []Task
for i := 0; i < 1000; i++ {
url := "https://httpbin.org/get?i=" + fmt.Sprintf("%d", i)
tasks = append(tasks, Task{seq: i, url: url})
}
fmt.Println("total urls: ", len(tasks))
out := make(chan Task)
go func() {
defer close(out)
for _, task := range tasks {
rl.Take()
out <- task
}
}()
return out
}
func main() {
rl := ratelimit.New(500, ratelimit.Per(60*time.Second)) // per second
taskChan := producer(rl)
resultsChan := make(chan Task)
// start 5 workers
numWorker := 5
var wg sync.WaitGroup
for i := 0; i < numWorker; i++ {
wg.Add(1)
go func(x int) {
defer wg.Done()
worker(fmt.Sprintf("worker-%d", x), taskChan, resultsChan)
}(i)
}
// a must have goroutine to close the resultChan
go func() {
wg.Wait()
close(resultsChan)
}()
for result := range resultsChan {
fmt.Println(result)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment