-
-
Save AntoineAugusti/80e99edfe205baf7a094 to your computer and use it in GitHub Desktop.
package main | |
import ( | |
"flag" | |
"fmt" | |
"time" | |
) | |
// Fake a long and difficult work. | |
func DoWork() { | |
time.Sleep(500 * time.Millisecond) | |
} | |
func main() { | |
maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 5, "the number of goroutines that are allowed to run concurrently") | |
nbJobs := flag.Int("nbJobs", 100, "the number of jobs that we need to do") | |
flag.Parse() | |
// Dummy channel to coordinate the number of concurrent goroutines. | |
// This channel should be buffered otherwise we will be immediately blocked | |
// when trying to fill it. | |
concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines) | |
// Fill the dummy channel with maxNbConcurrentGoroutines empty struct. | |
for i := 0; i < *maxNbConcurrentGoroutines; i++ { | |
concurrentGoroutines <- struct{}{} | |
} | |
// The done channel indicates when a single goroutine has | |
// finished its job. | |
done := make(chan bool) | |
// The waitForAllJobs channel allows the main program | |
// to wait until we have indeed done all the jobs. | |
waitForAllJobs := make(chan bool) | |
// Collect all the jobs, and since the job is finished, we can | |
// release another spot for a goroutine. | |
go func() { | |
for i := 0; i < *nbJobs; i++ { | |
<-done | |
// Say that another goroutine can now start. | |
concurrentGoroutines <- struct{}{} | |
} | |
// We have collected all the jobs, the program | |
// can now terminate | |
waitForAllJobs <- true | |
}() | |
// Try to start nbJobs jobs | |
for i := 1; i <= *nbJobs; i++ { | |
fmt.Printf("ID: %v: waiting to launch!\n", i) | |
// Try to receive from the concurrentGoroutines channel. When we have something, | |
// it means we can start a new goroutine because another one finished. | |
// Otherwise, it will block the execution until an execution | |
// spot is available. | |
<-concurrentGoroutines | |
fmt.Printf("ID: %v: it's my turn!\n", i) | |
go func(id int) { | |
DoWork() | |
fmt.Printf("ID: %v: all done!\n", id) | |
done <- true | |
}(i) | |
} | |
// Wait for all jobs to finish | |
<-waitForAllJobs | |
} |
A more simplified example with sync.WaitGroup is depicted here https://golangbot.com/buffered-channels-worker-pools/
Thank you! It helped me lot!
What about this?
https://github.com/panjf2000/ants
it's much simpler if you swap from filling the buffer then removing as jobs complete, to filling as jobs start:
package main import ( "flag" "fmt" "time" "sync" ) // Fake a long and difficult work. func DoWork() { time.Sleep(500 * time.Millisecond) } func main() { maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 2, "the number of goroutines that are allowed to run concurrently") nbJobs := flag.Int("nbJobs", 5, "the number of jobs that we need to do") flag.Parse() concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines) var wg sync.WaitGroup for i := 0; i < *nbJobs; i++ { wg.Add(1) go func(i int) { defer wg.Done() concurrentGoroutines <- struct{}{} fmt.Println("doing", i) DoWork() fmt.Println("finished", i) <-concurrentGoroutines }(i) } wg.Wait() }
Wouldn't this start
nbJobs
co-routines and just block most of them (onlymaxNbConcurrentGoroutines
would be active)?
True. To prevent that you will need to shift the concurrentGoroutines <- struct{}{}
line to just after the for loop
@abhinav3295 yeah to clarify, this is limiting the number of concurrent instances of the function 'DoWork'. You're right that all the goroutines would start instantly. Goroutines are cheap so this would not normally be an issue unless you're doing something extreme. @sidpat correctly shows how you would go about actually preventing the goroutines from starting.
what happens if you have a list to loop over and run your function with go for each value it returns?
You can just use a sleep that repeatedly call itself when the maximum process is tapped.
This is what I'm using, the locking function checks the number of running goroutine. It will go to sleep for n seconds when the running goroutine exceed the maximum. It will then call itself again and if the goroutine is saturated, it will escape.
package main
import (
"fmt"
"runtime"
"time"
)
func dummy() {
fmt.Println("dummy")
}
var maxProc = 2
func halt() {
if runtime.NumGoroutine() > maxProc {
time.Sleep(time.Second)
fmt.Println("sleeping...")
halt()
} else {
return
}
}
func main() {
for i := 0; i < 100; i++ {
halt()
go func(i int) {
dummy()
}(i)
}
}
@qxxt This suffers from 2 serious issues:
- Wastage of CPU cycles. Example: for the example you have shown, it'll end up spending almost all of the time in halt() without doing actual useful work
- Stackoverflow error due to potentially infinite recursive call to halt()
You can also use semaphores for this.
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
sem := semaphore.NewWeighted(2)
ctx := context.TODO()
var wg sync.WaitGroup
start := time.Now()
for i := 1; i < 20; i++ {
wg.Add(1)
go func() {
sem.Acquire(ctx, 1)
defer sem.Release(1)
defer wg.Done()
time.Sleep(100 * time.Millisecond)
}()
}
wg.Wait()
fmt.Printf("%v\n", time.Since(start))
}
I'm not sure about the performance, but it's the easiest and shortest method I found.
Cool... thanks for sharing.... 👍
1. Wastage of CPU cycles. Example: for the example you have shown, it'll end up spending almost all of the time in halt() without doing actual useful work
What do you mean? What about changing the waiting time?
2. Stackoverflow error due to potentially infinite recursive call to halt()
Yeah 👍 but there is only one routine recursively calling halt(). Anyway, I use for
now.
I didn't use that code, this is pretty much what I'm using now:
package main
import (
"fmt"
"runtime"
"time"
)
var maxProc = 5
func halt() {
for runtime.NumGoroutine() > maxProc {
fmt.Println("sleeping...")
time.Sleep(time.Second)
}
}
func main() {
for i := 0; i < 20; i++ {
go fmt.Println(i)
halt()
}
}
I'm not into the low-level stuff yet, so I might be oversimplifying here. It would be helpful if you will elaborate :).
Hey @qxxt , for your earlier code version:
- Let's suppose 2 routines are launched, and while launching 3rd, halt is invoked and it starts to wait for second.. but in the very next millisecond, one of the running go routines completes, so now, you'll end up wasting 999ms. This might seem less, but when heavy concurrency is desired, each wasted millisecond counts.
- For long running go routines... lets suppose it makes some networking call, and has a timeout of 60 seconds, and then retries(on error from network call) the same 5 times with exponential backoff starting with 2 seconds(very real scenario), this will take approx 2 minutes to either succeed, or return with error. In either case, your halt function would have recursively called itself 120/1=120 times.... pretty small this time, but, if you reduce the sleep time to let's suppose 100ms (to reduce wasted cpu cycles), it would now have recursed 1200 times... not very small.... each recursion takes up stack space... Although golang increases stack space as desired, but, bigger stack spaces generally slow down performance a lot....
Hope I was able to explain correctly...
I did some benchmarking if anyone is interested, parsing diff patch and doing some regex check on files from around 2000 git commits from a local repo. exec.Command()
is the bottleneck as it panics with open /dev/null: too many open files
due to soft ulimit
when the no of goroutines are uncontrolled.
Experiment | Execution Time |
---|---|
Without Goroutine | 3m33.714605387s |
With Goroutine, following @crepehat snippet, with maxNbConcurrentGoroutines set to 50 |
35.2966391 |
With Goroutine, checking runtime.NumGoroutine() > 80 and adding 1s sleep |
35.2392869 |
With Goroutine and semaphore NewWeighted(12) |
28.851024029s |
With limiter, NewConcurrencyLimiter set to 50 |
21.278964931s |
go 1.17
MacBook Pro (16-inch, 2019)
2.6 GHz 6-Core Intel i7 | 32 GB RAM
A bit more benchmarking done on level grounds.
Function execution times measured from Go with different goroutine concurrency limits (Average of 3 runs)
Sample | Limit 12 | Limit 50 | Limit 80 |
---|---|---|---|
Atomic | 23.86s | 23.40s | 27.88s |
Semaphore | 29.54s | 29.17s | 35.38s |
WaitGroup | 28.62s | 31.15s | 38.26s |
WaitGroup with Sleep | 2m 56.85s | 47.67s | 37.93s |
Without Goroutine | 3m 33.71s | NA | NA |
Go program’s total execution time measured using zsh’s time
with different concurrency limits.(Average of 3 runs)
Sample | Limit 12 | Limit 50 | Limit 80 |
---|---|---|---|
Atomic | 29.900 | 34.190 | 42.535 |
Semaphore | 30.275 | 30.406 | 36.771 |
WaitGroup | 29.464 | 32.382 | 39.660 |
WaitGroup with Sleep | 2m 56.8521729s | 48.905 | 39.100 |
Wouldn't this start
nbJobs
co-routines and just block most of them (onlymaxNbConcurrentGoroutines
would be active)?