Skip to content

Instantly share code, notes, and snippets.

@venkyvb
Forked from montanaflynn/pget.go
Last active July 12, 2020 05:39
Show Gist options
  • Save venkyvb/129d080c2e460767543c4b07517c181f to your computer and use it in GitHub Desktop.
Save venkyvb/129d080c2e460767543c4b07517c181f to your computer and use it in GitHub Desktop.
Bounded Parallel Get Requests in Golang
package main
import (
"fmt"
"sync"
)
const maxConcurrency = 5 // Max number of concurrency
func doSomething(a int) {
fmt.Println("Doing something...",a)
return
}
func main() {
var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than maxConcurrency
var wg sync.WaitGroup
// This starts max number of goroutines that wait for something to do
wg.Add(maxConcurrency)
for i:=0; i<maxConcurrency; i++ {
go func() {
for {
a, ok := <-ch
if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
wg.Done()
return
}
doSomething(a) // do something
}
}()
}
// Add the jobs to the channel, which is acts as a queue
for i:=0; i<50; i++ {
ch <- i // add i to the queue
}
close(ch) // This tells the goroutines there's nothing else to do
wg.Wait() // Wait for the threads to finish
}
package main
import (
"fmt"
"net/http"
"sort"
"time"
)
// a struct to hold the result from each request including an index
// which will be used for sorting the results after they come in
type result struct {
index int
res http.Response
err error
}
// boundedParallelGet sends requests in parallel but only up to a certain
// limit, and furthermore it's only parallel up to the amount of CPUs but
// is always concurrent up to the concurrency limit
func boundedParallelGet(urls []string, concurrencyLimit int) []result {
// this buffered channel will block at the concurrency limit
semaphoreChan := make(chan struct{}, concurrencyLimit)
// this channel will not block and collect the http request results
resultsChan := make(chan *result)
// make sure we close these channels when we're done with them
defer func() {
close(semaphoreChan)
close(resultsChan)
}()
// keen an index and loop through every url we will send a request to
for i, url := range urls {
// start a go routine with the index and url in a closure
go func(i int, url string) {
// this sends an empty struct into the semaphoreChan which
// is basically saying add one to the limit, but when the
// limit has been reached block until there is room
semaphoreChan <- struct{}{}
// send the request and put the response in a result struct
// along with the index so we can sort them later along with
// any error that might have occoured
res, err := http.Get(url)
result := &result{i, *res, err}
// now we can send the result struct through the resultsChan
resultsChan <- result
// once we're done it's we read from the semaphoreChan which
// has the effect of removing one from the limit and allowing
// another goroutine to start
<-semaphoreChan
}(i, url)
}
// make a slice to hold the results we're expecting
var results []result
// start listening for any results over the resultsChan
// once we get a result append it to the result slice
for {
result := <-resultsChan
results = append(results, *result)
// if we've reached the expected amount of urls then stop
if len(results) == len(urls) {
break
}
}
// let's sort these results real quick
sort.Slice(results, func(i, j int) bool {
return results[i].index < results[j].index
})
// now we're done we return the results
return results
}
// we'll use the init function to set up the benchmark
// by making a slice of 100 URLs to send requets to
var urls []string
func init() {
for i := 0; i < 100; i++ {
urls = append(urls, "http://httpbin.org/get")
}
}
// the main function sets up an anonymous benchmark func
// that will time how long it takes to get all the URLs
// at the specified concurrency level
//
// and you should see something like the following printed
// depending on how fast your computer and internet is
//
// 5 bounded parallel requests: 100/100 in 5.533223255
// 10 bounded parallel requests: 100/100 in 2.5115351219
// 25 bounded parallel requests: 100/100 in 1.189462884
// 50 bounded parallel requests: 100/100 in 1.17430002
// 75 bounded parallel requests: 100/100 in 1.001383863
// 100 bounded parallel requests: 100/100 in 1.3769354
func main() {
benchmark := func(urls []string, concurrency int) string {
startTime := time.Now()
results := boundedParallelGet(urls, concurrency)
seconds := time.Since(startTime).Seconds()
tmplate := "%d bounded parallel requests: %d/%d in %v"
return fmt.Sprintf(tmplate, concurrency, len(results), len(urls), seconds)
}
fmt.Println(benchmark(urls, 10))
fmt.Println(benchmark(urls, 25))
fmt.Println(benchmark(urls, 50))
fmt.Println(benchmark(urls, 75))
fmt.Println(benchmark(urls, 100))
}
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment