Skip to content

Instantly share code, notes, and snippets.

@CAFxX
Created April 4, 2021 23:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CAFxX/25c2123b1de75555734ec7152abd74e1 to your computer and use it in GitHub Desktop.
Save CAFxX/25c2123b1de75555734ec7152abd74e1 to your computer and use it in GitHub Desktop.
errgroup
type Group interface {
Go(func() error)
Wait() error
}
func New(ctx context.Context, opts ...Option) (Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
g := &group{cancel: cancel}
g.cond.L = &g.mutex
for _, opt := range opts {
opt(g)
}
if g.maxWorkers < 0 {
panic("invalid max number of workers")
}
return g, ctx
}
type Option func(*group)
func MaxWorkers(n int) Option {
return func(g *group) {
g.maxWorkers = n
}
}
type group struct {
cancel func()
maxWorkers int
mutex sync.Mutex
cond sync.Cond
queue []func() error
workers int
err error
}
func (g *group) Go(f func() error) {
g.mutex.Lock()
if g.err != nil {
g.mutex.Unlock()
return
}
if g.maxWorkers == 0 || g.workers < g.maxWorkers {
g.workers++
g.mutex.Unlock()
go g.worker(f)
return
}
g.queue = append(g.queue, f)
g.mutex.Unlock()
}
func (g *group) worker(f func() error) {
done := false
onDone := func() { // must be called with g.mutex locked
if done {
panic("onDone called twice")
}
g.worker--
if g.worker == 0 && g.waiting {
g.cond.Broadcast()
}
done = true
}
defer func() {
if !done {
g.mutex.Lock()
onDone()
g.mutex.Unlock()
}
}()
for {
err := f()
if err != nil {
g.mutex.Lock()
if g.err == nil {
g.err = err
g.cancel()
g.queue = nil
}
onDone()
g.mutex.Unlock()
return
}
if g.maxWorkers == 0 {
return
}
g.mutex.Lock()
if len(g.queue) == 0 {
onDone()
g.mutex.Unlock()
return
}
f, g.queue = g.queue[0], g.queue[1:]
g.mutex.Unlock()
}
}
func (g *group) Wait() error {
g.mutex.Lock()
defer g.mutex.Unlock()
g.waiting = true
defer func() {
g.waiting = false
}()
for g.workers > 0 || len(g.queue) > 0 {
g.cond.Wait()
}
err := g.err
g.cancel()
return err
}
func New(ctx context.Context, opts ...Option) (Group, context.Context) {
g, ctx := errgroup.WithContext(ctx)
sema := semaphore.NewWeighted(n)
return &group{g, ctx, sema}, ctx
}
type group struct {
*errgroup.Group
ctx context.Context
sema *semaphore.Weighted
}
func (g *group) Go(f func() error) {
g.Group.Go(func() error {
if err := g.sema.Acquire(g.ctx, 1); err != nil {
return err
}
defer g.sema.Release(1)
return f()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment