Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Limit the maximum number of goroutines running at the same time
package main
import (
"flag"
"fmt"
"time"
)
// Fake a long and difficult work.
func DoWork() {
time.Sleep(500 * time.Millisecond)
}
func main() {
maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 5, "the number of goroutines that are allowed to run concurrently")
nbJobs := flag.Int("nbJobs", 100, "the number of jobs that we need to do")
flag.Parse()
// Dummy channel to coordinate the number of concurrent goroutines.
// This channel should be buffered otherwise we will be immediately blocked
// when trying to fill it.
concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)
// Fill the dummy channel with maxNbConcurrentGoroutines empty struct.
for i := 0; i < *maxNbConcurrentGoroutines; i++ {
concurrentGoroutines <- struct{}{}
}
// The done channel indicates when a single goroutine has
// finished its job.
done := make(chan bool)
// The waitForAllJobs channel allows the main program
// to wait until we have indeed done all the jobs.
waitForAllJobs := make(chan bool)
// Collect all the jobs, and since the job is finished, we can
// release another spot for a goroutine.
go func() {
for i := 0; i < *nbJobs; i++ {
<-done
// Say that another goroutine can now start.
concurrentGoroutines <- struct{}{}
}
// We have collected all the jobs, the program
// can now terminate
waitForAllJobs <- true
}()
// Try to start nbJobs jobs
for i := 1; i <= *nbJobs; i++ {
fmt.Printf("ID: %v: waiting to launch!\n", i)
// Try to receive from the concurrentGoroutines channel. When we have something,
// it means we can start a new goroutine because another one finished.
// Otherwise, it will block the execution until an execution
// spot is available.
<-concurrentGoroutines
fmt.Printf("ID: %v: it's my turn!\n", i)
go func(id int) {
DoWork()
fmt.Printf("ID: %v: all done!\n", id)
done <- true
}(i)
}
// Wait for all jobs to finish
<-waitForAllJobs
}
@xor-gate

This comment has been minimized.

Copy link

xor-gate commented Sep 12, 2017

Somebody has created a limiter package which does almost the same as your gist, except it uses atomics to generate job ids: https://github.com/korovkin/limiter

@naikrovek

This comment has been minimized.

Copy link

naikrovek commented Jun 6, 2018

@xor-gate thank you, I was looking for something like this.

@xor-gate

This comment has been minimized.

Copy link

xor-gate commented Feb 9, 2019

A more simplified example with sync.WaitGroup is depicted here https://golangbot.com/buffered-channels-worker-pools/

@crepehat

This comment has been minimized.

Copy link

crepehat commented Sep 25, 2019

it's much simpler if you swap from filling the buffer then removing as jobs complete, to filling as jobs start:

package main

import (
	"flag"
	"fmt"
	"time"
	"sync"
)

// Fake a long and difficult work.
func DoWork() {
	time.Sleep(500 * time.Millisecond)
}

func main() {
	maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 2, "the number of goroutines that are allowed to run concurrently")
	nbJobs := flag.Int("nbJobs", 5, "the number of jobs that we need to do")
	flag.Parse()

	concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)

	var wg sync.WaitGroup

	for i := 0; i < *nbJobs; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			concurrentGoroutines <- struct{}{}
			fmt.Println("doing", i)
			DoWork()
			fmt.Println("finished", i)
			<-concurrentGoroutines

		}(i)

	}
	wg.Wait()

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.