Skip to content

Instantly share code, notes, and snippets.

@AntoineAugusti
Last active March 1, 2024 10:44
  • Star 61 You must be signed in to star a gist
  • Fork 16 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save AntoineAugusti/80e99edfe205baf7a094 to your computer and use it in GitHub Desktop.
Limit the maximum number of goroutines running at the same time
package main
import (
"flag"
"fmt"
"time"
)
// Fake a long and difficult work.
func DoWork() {
time.Sleep(500 * time.Millisecond)
}
func main() {
maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 5, "the number of goroutines that are allowed to run concurrently")
nbJobs := flag.Int("nbJobs", 100, "the number of jobs that we need to do")
flag.Parse()
// Dummy channel to coordinate the number of concurrent goroutines.
// This channel should be buffered otherwise we will be immediately blocked
// when trying to fill it.
concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)
// Fill the dummy channel with maxNbConcurrentGoroutines empty struct.
for i := 0; i < *maxNbConcurrentGoroutines; i++ {
concurrentGoroutines <- struct{}{}
}
// The done channel indicates when a single goroutine has
// finished its job.
done := make(chan bool)
// The waitForAllJobs channel allows the main program
// to wait until we have indeed done all the jobs.
waitForAllJobs := make(chan bool)
// Collect all the jobs, and since the job is finished, we can
// release another spot for a goroutine.
go func() {
for i := 0; i < *nbJobs; i++ {
<-done
// Say that another goroutine can now start.
concurrentGoroutines <- struct{}{}
}
// We have collected all the jobs, the program
// can now terminate
waitForAllJobs <- true
}()
// Try to start nbJobs jobs
for i := 1; i <= *nbJobs; i++ {
fmt.Printf("ID: %v: waiting to launch!\n", i)
// Try to receive from the concurrentGoroutines channel. When we have something,
// it means we can start a new goroutine because another one finished.
// Otherwise, it will block the execution until an execution
// spot is available.
<-concurrentGoroutines
fmt.Printf("ID: %v: it's my turn!\n", i)
go func(id int) {
DoWork()
fmt.Printf("ID: %v: all done!\n", id)
done <- true
}(i)
}
// Wait for all jobs to finish
<-waitForAllJobs
}
@freakynit
Copy link

Hey @kichik thanks for this.... any idea on performance difference vs @crepehat 's snippet?

@kichik
Copy link

kichik commented Mar 8, 2022

I'm not sure about the performance, but it's the easiest and shortest method I found.

@freakynit
Copy link

Cool... thanks for sharing.... 👍

@qxxt
Copy link

qxxt commented Jul 10, 2022

@freakynit

1. Wastage of CPU cycles. Example: for the example you have shown, it'll end up spending almost all of the time in halt() without doing actual useful work

What do you mean? What about changing the waiting time?

2. Stackoverflow error due to potentially infinite recursive call to halt()

Yeah 👍 but there is only one routine recursively calling halt(). Anyway, I use for now.

I didn't use that code, this is pretty much what I'm using now:

package main

import (
	"fmt"
	"runtime"
	"time"
)

var maxProc = 5

func halt() {
	for runtime.NumGoroutine() > maxProc {
		fmt.Println("sleeping...")
		time.Sleep(time.Second)
	}
}

func main() {
	for i := 0; i < 20; i++ {
		go fmt.Println(i)
		halt()
	}
}

I'm not into the low-level stuff yet, so I might be oversimplifying here. It would be helpful if you will elaborate :).

@freakynit
Copy link

Hey @qxxt , for your earlier code version:

  1. Let's suppose 2 routines are launched, and while launching 3rd, halt is invoked and it starts to wait for second.. but in the very next millisecond, one of the running go routines completes, so now, you'll end up wasting 999ms. This might seem less, but when heavy concurrency is desired, each wasted millisecond counts.
  2. For long running go routines... lets suppose it makes some networking call, and has a timeout of 60 seconds, and then retries(on error from network call) the same 5 times with exponential backoff starting with 2 seconds(very real scenario), this will take approx 2 minutes to either succeed, or return with error. In either case, your halt function would have recursively called itself 120/1=120 times.... pretty small this time, but, if you reduce the sleep time to let's suppose 100ms (to reduce wasted cpu cycles), it would now have recursed 1200 times... not very small.... each recursion takes up stack space... Although golang increases stack space as desired, but, bigger stack spaces generally slow down performance a lot....

Hope I was able to explain correctly...

@ajinabraham
Copy link

ajinabraham commented Aug 7, 2022

I did some benchmarking if anyone is interested, parsing diff patch and doing some regex check on files from around 2000 git commits from a local repo. exec.Command() is the bottleneck as it panics with open /dev/null: too many open files due to soft ulimit when the no of goroutines are uncontrolled.

Experiment Execution Time
Without Goroutine 3m33.714605387s
With Goroutine, following @crepehat snippet, with maxNbConcurrentGoroutines set to 50 35.2966391
With Goroutine, checking runtime.NumGoroutine() > 80 and adding 1s sleep 35.2392869
With Goroutine and semaphore NewWeighted(12) 28.851024029s
With limiter, NewConcurrencyLimiter set to 50 21.278964931s
go 1.17
MacBook Pro (16-inch, 2019)
2.6 GHz 6-Core Intel i7 | 32 GB RAM

@ajinabraham
Copy link

A bit more benchmarking done on level grounds.

Function execution times measured from Go with different goroutine concurrency limits (Average of 3 runs)

Sample Limit 12 Limit 50 Limit 80
Atomic 23.86s 23.40s 27.88s
Semaphore 29.54s 29.17s 35.38s
WaitGroup 28.62s 31.15s 38.26s
WaitGroup with Sleep 2m 56.85s 47.67s 37.93s
Without Goroutine 3m 33.71s NA NA

Go program’s total execution time measured using zsh’s time with different concurrency limits.(Average of 3 runs)

Sample Limit 12 Limit 50 Limit 80
Atomic 29.900 34.190 42.535
Semaphore 30.275 30.406 36.771
WaitGroup 29.464 32.382 39.660
WaitGroup with Sleep 2m 56.8521729s 48.905 39.100

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment