Skip to content

Instantly share code, notes, and snippets.

@richardartoul
Created July 24, 2023 14:15
Show Gist options
  • Save richardartoul/5d24cd508ac9372bf25c7a639668499c to your computer and use it in GitHub Desktop.
Save richardartoul/5d24cd508ac9372bf25c7a639668499c to your computer and use it in GitHub Desktop.
A simple Goroutine pool for amortizing stack growth overhead
package pool
import (
"fmt"
"sync"
"runtime"
)
// GoroutinePool pools Goroutines to avoid performance penalties associated with spawning many
// short-lived goroutines that each have to individually grow their stack.
type GoroutinePool struct {
mu sync.Mutex
numWorkersBusy int
numWorkers int
workCh chan work
maxWorkers int
}
// NewGoroutinePool creates a new GoroutinePool.
func NewGoroutinePool(maxWorkers int) *GoroutinePool {
gp := &GoroutinePool{
workCh: make(chan work, runtime.NumCPU()),
maxWorkers: maxWorkers,
}
return gp
}
func (g *GoroutinePool) Go(fn func()) {
var (
existingWorkerAvailable = false
spawnNewWorker = false
)
g.mu.Lock()
numWorkers := g.numWorkers
if numWorkers < 0 {
g.mu.Unlock()
panic(fmt.Sprintf("numWorkers: %d < 0", numWorkers))
}
numWorkersBusy := g.numWorkersBusy
if numWorkersBusy < 0 {
g.mu.Unlock()
panic(fmt.Sprintf("numWorkersBusy: %d < 0", numWorkersBusy))
}
if g.numWorkersBusy < g.numWorkers {
g.numWorkersBusy++
existingWorkerAvailable = true
} else if g.numWorkers < g.maxWorkers {
g.numWorkersBusy++
g.numWorkers++
spawnNewWorker = true
} else {
// Do nothing and it will just spawn a new goroutine that won't participate in the pool.
}
g.mu.Unlock()
if existingWorkerAvailable {
// There is a worker available, just submit it to the work channel to be picked
// up by a free worker.
g.workCh <- work{fn: fn}
return
}
if spawnNewWorker {
// There wasn't a free worker, so spawn a new one and then submit it to the work
// channel to be picked up by whichever goroutine gets to it first.
go g.workerLoop()
g.workCh <- work{fn: fn}
return
}
// There wasn't a free worker available, *and* we're already at the worker limit so
// we can't spawn a new worker. What we can do though is just spawn a new goroutine
// that won't be pooled.
go fn()
}
func (g *GoroutinePool) workerLoop() {
defer func() {
g.mu.Lock()
g.numWorkers--
g.mu.Unlock()
}()
i := 0
for work := range g.workCh {
work.fn()
i++
g.mu.Lock()
g.numWorkersBusy--
g.mu.Unlock()
if i > 1000 {
return
}
}
}
type work struct {
fn func()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment