Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Limit the maximum number of goroutines running at the same time
package main
import (
"flag"
"fmt"
"time"
)
// Fake a long and difficult work.
func DoWork() {
time.Sleep(500 * time.Millisecond)
}
func main() {
maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 5, "the number of goroutines that are allowed to run concurrently")
nbJobs := flag.Int("nbJobs", 100, "the number of jobs that we need to do")
flag.Parse()
// Dummy channel to coordinate the number of concurrent goroutines.
// This channel should be buffered otherwise we will be immediately blocked
// when trying to fill it.
concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)
// Fill the dummy channel with maxNbConcurrentGoroutines empty struct.
for i := 0; i < *maxNbConcurrentGoroutines; i++ {
concurrentGoroutines <- struct{}{}
}
// The done channel indicates when a single goroutine has
// finished its job.
done := make(chan bool)
// The waitForAllJobs channel allows the main program
// to wait until we have indeed done all the jobs.
waitForAllJobs := make(chan bool)
// Collect all the jobs, and since the job is finished, we can
// release another spot for a goroutine.
go func() {
for i := 0; i < *nbJobs; i++ {
<-done
// Say that another goroutine can now start.
concurrentGoroutines <- struct{}{}
}
// We have collected all the jobs, the program
// can now terminate
waitForAllJobs <- true
}()
// Try to start nbJobs jobs
for i := 1; i <= *nbJobs; i++ {
fmt.Printf("ID: %v: waiting to launch!\n", i)
// Try to receive from the concurrentGoroutines channel. When we have something,
// it means we can start a new goroutine because another one finished.
// Otherwise, it will block the execution until an execution
// spot is available.
<-concurrentGoroutines
fmt.Printf("ID: %v: it's my turn!\n", i)
go func(id int) {
DoWork()
fmt.Printf("ID: %v: all done!\n", id)
done <- true
}(i)
}
// Wait for all jobs to finish
<-waitForAllJobs
}
@xor-gate
Copy link

Somebody has created a limiter package which does almost the same as your gist, except it uses atomics to generate job ids: https://github.com/korovkin/limiter

@naikrovek
Copy link

@xor-gate thank you, I was looking for something like this.

@xor-gate
Copy link

xor-gate commented Feb 9, 2019

A more simplified example with sync.WaitGroup is depicted here https://golangbot.com/buffered-channels-worker-pools/

@crepehat
Copy link

it's much simpler if you swap from filling the buffer then removing as jobs complete, to filling as jobs start:

package main

import (
	"flag"
	"fmt"
	"time"
	"sync"
)

// Fake a long and difficult work.
func DoWork() {
	time.Sleep(500 * time.Millisecond)
}

func main() {
	maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 2, "the number of goroutines that are allowed to run concurrently")
	nbJobs := flag.Int("nbJobs", 5, "the number of jobs that we need to do")
	flag.Parse()

	concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)

	var wg sync.WaitGroup

	for i := 0; i < *nbJobs; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			concurrentGoroutines <- struct{}{}
			fmt.Println("doing", i)
			DoWork()
			fmt.Println("finished", i)
			<-concurrentGoroutines

		}(i)

	}
	wg.Wait()

}

@radiaku
Copy link

radiaku commented Aug 15, 2020

@crepehat

this one is good. Thanks 👍

@abhinav3295
Copy link

it's much simpler if you swap from filling the buffer then removing as jobs complete, to filling as jobs start:

package main

import (
	"flag"
	"fmt"
	"time"
	"sync"
)

// Fake a long and difficult work.
func DoWork() {
	time.Sleep(500 * time.Millisecond)
}

func main() {
	maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 2, "the number of goroutines that are allowed to run concurrently")
	nbJobs := flag.Int("nbJobs", 5, "the number of jobs that we need to do")
	flag.Parse()

	concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)

	var wg sync.WaitGroup

	for i := 0; i < *nbJobs; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			concurrentGoroutines <- struct{}{}
			fmt.Println("doing", i)
			DoWork()
			fmt.Println("finished", i)
			<-concurrentGoroutines

		}(i)

	}
	wg.Wait()

}

Wouldn't this start nbJobs co-routines and just block most of them (only maxNbConcurrentGoroutines would be active)?

@ashishgalagali
Copy link

A more simplified example with sync.WaitGroup is depicted here https://golangbot.com/buffered-channels-worker-pools/

Thank you! It helped me lot!

@hiqsociety
Copy link

@sidpat
Copy link

sidpat commented Nov 29, 2020

it's much simpler if you swap from filling the buffer then removing as jobs complete, to filling as jobs start:

package main

import (
	"flag"
	"fmt"
	"time"
	"sync"
)

// Fake a long and difficult work.
func DoWork() {
	time.Sleep(500 * time.Millisecond)
}

func main() {
	maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 2, "the number of goroutines that are allowed to run concurrently")
	nbJobs := flag.Int("nbJobs", 5, "the number of jobs that we need to do")
	flag.Parse()

	concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)

	var wg sync.WaitGroup

	for i := 0; i < *nbJobs; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			concurrentGoroutines <- struct{}{}
			fmt.Println("doing", i)
			DoWork()
			fmt.Println("finished", i)
			<-concurrentGoroutines

		}(i)

	}
	wg.Wait()

}

Wouldn't this start nbJobs co-routines and just block most of them (only maxNbConcurrentGoroutines would be active)?

True. To prevent that you will need to shift the concurrentGoroutines <- struct{}{} line to just after the for loop

@crepehat
Copy link

@abhinav3295 yeah to clarify, this is limiting the number of concurrent instances of the function 'DoWork'. You're right that all the goroutines would start instantly. Goroutines are cheap so this would not normally be an issue unless you're doing something extreme. @sidpat correctly shows how you would go about actually preventing the goroutines from starting.

@Arsen-Uulu
Copy link

what happens if you have a list to loop over and run your function with go for each value it returns?

@qxxt
Copy link

qxxt commented Jan 29, 2022

You can just use a sleep that repeatedly call itself when the maximum process is tapped.

https://go.dev/play/p/s9Ytimda3Eq

@qxxt
Copy link

qxxt commented Jan 29, 2022

This is what I'm using, the locking function checks the number of running goroutine. It will go to sleep for n seconds when the running goroutine exceed the maximum. It will then call itself again and if the goroutine is saturated, it will escape.

package main

import (
	"fmt"
	"runtime"
	"time"
)

func dummy() {
	fmt.Println("dummy")
}

var maxProc = 2

func halt() {
	if runtime.NumGoroutine() > maxProc {
		time.Sleep(time.Second)
		fmt.Println("sleeping...")
		halt()
	} else {
		return
	}

}

func main() {
	for i := 0; i < 100; i++ {
		halt()
		go func(i int) {
			dummy()
		}(i)
	}
}

@freakynit
Copy link

@qxxt This suffers from 2 serious issues:

  1. Wastage of CPU cycles. Example: for the example you have shown, it'll end up spending almost all of the time in halt() without doing actual useful work
  2. Stackoverflow error due to potentially infinite recursive call to halt()

@kichik
Copy link

kichik commented Mar 7, 2022

You can also use semaphores for this.

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"golang.org/x/sync/semaphore"
)

func main() {
	sem := semaphore.NewWeighted(2)
	ctx := context.TODO()
	var wg sync.WaitGroup

	start := time.Now()

	for i := 1; i < 20; i++ {
		wg.Add(1)

		go func() {
			sem.Acquire(ctx, 1)
			defer sem.Release(1)
			defer wg.Done()

			time.Sleep(100 * time.Millisecond)
		}()
	}

	wg.Wait()

	fmt.Printf("%v\n", time.Since(start))
}

@freakynit
Copy link

Hey @kichik thanks for this.... any idea on performance difference vs @crepehat 's snippet?

@kichik
Copy link

kichik commented Mar 8, 2022

I'm not sure about the performance, but it's the easiest and shortest method I found.

@freakynit
Copy link

Cool... thanks for sharing.... 👍

@qxxt
Copy link

qxxt commented Jul 10, 2022

@freakynit

1. Wastage of CPU cycles. Example: for the example you have shown, it'll end up spending almost all of the time in halt() without doing actual useful work

What do you mean? What about changing the waiting time?

2. Stackoverflow error due to potentially infinite recursive call to halt()

Yeah 👍 but there is only one routine recursively calling halt(). Anyway, I use for now.

I didn't use that code, this is pretty much what I'm using now:

package main

import (
	"fmt"
	"runtime"
	"time"
)

var maxProc = 5

func halt() {
	for runtime.NumGoroutine() > maxProc {
		fmt.Println("sleeping...")
		time.Sleep(time.Second)
	}
}

func main() {
	for i := 0; i < 20; i++ {
		go fmt.Println(i)
		halt()
	}
}

I'm not into the low-level stuff yet, so I might be oversimplifying here. It would be helpful if you will elaborate :).

@freakynit
Copy link

Hey @qxxt , for your earlier code version:

  1. Let's suppose 2 routines are launched, and while launching 3rd, halt is invoked and it starts to wait for second.. but in the very next millisecond, one of the running go routines completes, so now, you'll end up wasting 999ms. This might seem less, but when heavy concurrency is desired, each wasted millisecond counts.
  2. For long running go routines... lets suppose it makes some networking call, and has a timeout of 60 seconds, and then retries(on error from network call) the same 5 times with exponential backoff starting with 2 seconds(very real scenario), this will take approx 2 minutes to either succeed, or return with error. In either case, your halt function would have recursively called itself 120/1=120 times.... pretty small this time, but, if you reduce the sleep time to let's suppose 100ms (to reduce wasted cpu cycles), it would now have recursed 1200 times... not very small.... each recursion takes up stack space... Although golang increases stack space as desired, but, bigger stack spaces generally slow down performance a lot....

Hope I was able to explain correctly...

@ajinabraham
Copy link

ajinabraham commented Aug 7, 2022

I did some benchmarking if anyone is interested, parsing diff patch and doing some regex check on files from around 2000 git commits from a local repo. exec.Command() is the bottleneck as it panics with open /dev/null: too many open files due to soft ulimit when the no of goroutines are uncontrolled.

Experiment Execution Time
Without Goroutine 3m33.714605387s
With Goroutine, following @crepehat snippet, with maxNbConcurrentGoroutines set to 50 35.2966391
With Goroutine, checking runtime.NumGoroutine() > 80 and adding 1s sleep 35.2392869
With Goroutine and semaphore NewWeighted(12) 28.851024029s
With limiter, NewConcurrencyLimiter set to 50 21.278964931s
go 1.17
MacBook Pro (16-inch, 2019)
2.6 GHz 6-Core Intel i7 | 32 GB RAM

@ajinabraham
Copy link

A bit more benchmarking done on level grounds.

Function execution times measured from Go with different goroutine concurrency limits (Average of 3 runs)

Sample Limit 12 Limit 50 Limit 80
Atomic 23.86s 23.40s 27.88s
Semaphore 29.54s 29.17s 35.38s
WaitGroup 28.62s 31.15s 38.26s
WaitGroup with Sleep 2m 56.85s 47.67s 37.93s
Without Goroutine 3m 33.71s NA NA

Go program’s total execution time measured using zsh’s time with different concurrency limits.(Average of 3 runs)

Sample Limit 12 Limit 50 Limit 80
Atomic 29.900 34.190 42.535
Semaphore 30.275 30.406 36.771
WaitGroup 29.464 32.382 39.660
WaitGroup with Sleep 2m 56.8521729s 48.905 39.100

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment