Skip to content

Instantly share code, notes, and snippets.

@dragonsinth
Created November 3, 2022 13:55
Show Gist options
  • Save dragonsinth/038c72f1bc4dc6f9a35eeabe0e3816b8 to your computer and use it in GitHub Desktop.
Save dragonsinth/038c72f1bc4dc6f9a35eeabe0e3816b8 to your computer and use it in GitHub Desktop.
package concurrency
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
)
const (
doThingWorkers = 4
doSubThingWorkers = 10
)
// 1) Go offers unique advantages in concurrency composition and non-leaky abstraction
// DoThing ...
//
// 2) The basic contract between decoupled components in a Go server is
//
// func DoThing(ctx, args...) (value, error) {
// }
//
// - If the caller cancels the context, the callee had better exit quickly.
//
// - When this method returns, it better have closed everything it opened.
//
// - errgroups fit well into this model because they compose
func DoThing(ctx context.Context, args ...any) (any, error) {
g, gCtx := errgroup.WithContext(ctx)
results := make([]any, doThingWorkers) // could also be a result-collecting channel, or a mutexed data structure like a map.
for i := 0; i < doThingWorkers; i++ {
i, ctx := i, gCtx
g.Go(func() error {
ret, err := DoSubThing(ctx, i, args...)
results[i] = ret
return err
})
}
return results, g.Wait()
}
func DoSubThing(ctx context.Context, i int, args ...any) (any, error) {
// Can have its own group.
g, _ := errgroup.WithContext(ctx)
for i := 0; i < doSubThingWorkers; i++ {
// stuff
}
return nil, g.Wait()
}
// 3) Prefer callback functions over channels in APIs.
type doc struct{}
// StreamDocumentsChannel bad: forces the caller to use channels and multiple goroutines.
func StreamDocumentsChannel(ctx context.Context, out chan<- *doc) error {
defer close(out)
for i := 0; i < 10; i++ {
// Most people will forget the select
select {
case out <- &doc{}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// docCallback can be called concurrently
type docCallback func(ctx context.Context, d *doc) error
// StreamDocumentsCallback better: the caller can still use channels if it wants to.
func StreamDocumentsCallback(ctx context.Context, f docCallback) error {
for i := 0; i < 10; i++ {
if err := f(ctx, &doc{}); err != nil {
return err
}
}
return nil
}
func simpleCaller(ctx context.Context) error {
return StreamDocumentsCallback(ctx, func(ctx context.Context, d *doc) error {
fmt.Println(d)
return nil
})
}
// 4) 99% of the time: mutexes should be structure scoped, concurrency controls (go func, channel, errgroup, context)
// should be method scoped.
func complexCaller(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
resultChan := make(chan *doc, 64)
// producer
g.Go(func() error {
defer close(resultChan)
return StreamDocumentsCallback(ctx, func(ctx context.Context, d *doc) error {
select {
case resultChan <- &doc{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
})
})
// consumer
g.Go(func() error {
for {
select {
case d, ok := <-resultChan:
if !ok {
return nil // done
}
fmt.Println(d)
case <-ctx.Done():
return ctx.Err()
}
}
})
return g.Wait()
}
// Also possible, but document that `f` can be called concurrently.
func StreamDocumentsCallbackParallel(ctx context.Context, f func(ctx context.Context, d *doc) error) error {
g, gCtx := errgroup.WithContext(ctx)
for i := 0; i < 10; i++ {
ctx := gCtx
g.Go(func() error {
if err := f(ctx, &doc{}); err != nil {
return err
}
return nil
})
}
return g.Wait()
}
// 5) If you see channel, errgroup, context embedded into a structure, be suspicious.
// There are legitimate use cases (think task queue) but you should be sure there's a good reason.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment