Code snippets for my blog post "The X-Files: Avoiding Concurrency Boilerplate with golang.org/x/sync"
BenchmarkMutexCache/10-8 10000000 180 ns/op 0 B/op 0 allocs/op | |
BenchmarkMutexCache/100-8 10000000 187 ns/op 0 B/op 0 allocs/op | |
BenchmarkMutexCache/1000-8 10000000 214 ns/op 0 B/op 0 allocs/op | |
BenchmarkMutexCache/10000-8 10000000 231 ns/op 0 B/op 0 allocs/op | |
BenchmarkMutexCache/100000-8 5000000 254 ns/op 2 B/op 0 allocs/op | |
BenchmarkMutexCache/1000000-8 1000000 1159 ns/op 102 B/op 1 allocs/op | |
BenchmarkMutexCache/10000000-8 1000000 1481 ns/op 184 B/op 2 allocs/op | |
BenchmarkMutexCache/100000000-8 1000000 1655 ns/op 187 B/op 3 allocs/op | |
BenchmarkSyncMapCache/10-8 5000000 221 ns/op 0 B/op 0 allocs/op | |
BenchmarkSyncMapCache/100-8 10000000 235 ns/op 0 B/op 0 allocs/op | |
BenchmarkSyncMapCache/1000-8 10000000 235 ns/op 0 B/op 0 allocs/op | |
BenchmarkSyncMapCache/10000-8 10000000 246 ns/op 0 B/op 0 allocs/op | |
BenchmarkSyncMapCache/100000-8 5000000 264 ns/op 5 B/op 0 allocs/op | |
BenchmarkSyncMapCache/1000000-8 1000000 1378 ns/op 146 B/op 3 allocs/op | |
BenchmarkSyncMapCache/10000000-8 1000000 1939 ns/op 237 B/op 5 allocs/op | |
BenchmarkSyncMapCache/100000000-8 1000000 2090 ns/op 241 B/op 6 allocs/op |
// Debounce wraps e, preventing duplicate NamedActions from running | |
// concurrently, even from separate calls to Execute. | |
func Debounce(e Executor) Executor { | |
return debouncer{ | |
ex: e, | |
sf: new(singleflight.Group), | |
} | |
} | |
type debouncer struct { | |
ex Executor | |
sf *singleflight.Group | |
} | |
// Execute attaches a singleflight.Group to any NamedActions, effectively debouncing | |
// identical Actions if ran concurrently. | |
func (d debouncer) Execute(ctx context.Context, actions []Action) error { | |
wrapped := make([]Action, len(actions)) | |
for i, a := range actions { | |
if na, ok := a.(NamedAction); ok { | |
// compose the NamedAction with the singleflight.Group | |
wrapped[i] = debouncedAction{ | |
NamedAction: na, | |
sf: d.sf, | |
} | |
} else { | |
// otherwise, pass it through untouched | |
wrapped[i] = actions[i] | |
} | |
} | |
// delegate wrapped Actions to decorated Executor | |
return d.ex.Execute(ctx, wrapped) | |
} | |
type debouncedAction struct { | |
NamedAction | |
sf *singleflight.Group | |
} | |
func (da debouncedAction) Execute(ctx context.Context) error { | |
// map the composed Action's Execute function with the expected signature | |
// for singleflight.Group.Do. | |
fn := func() (interface{}, error) { | |
return nil, da.NamedAction.Execute(ctx) | |
} | |
_, err, _ := da.sf.Do(da.ID(), fn) | |
return err | |
} |
// An Action performs a single arbitrary task. | |
type Action interface { | |
// Execute performs the work of an Action. This method should make a best | |
// effort to be cancelled if the provided ctx is cancelled. | |
Execute(ctx context.Context) error | |
} | |
// An Executor performs a set of Actions. It is up to the implementing type | |
// to define the concurrency and open/closed failure behavior of the actions. | |
type Executor interface { | |
// Execute performs all provided actions by calling their Execute method. | |
// This method should make a best-effort to cancel outstanding actions if the | |
// provided ctx is cancelled. | |
Execute(ctx context.Context, actions []Action) error | |
} | |
// ActionFunc permits using a standalone function as an Action. | |
type ActionFunc func(context.Context) error | |
// Execute satisfies the Action interface, delegating the call to the | |
// underlying function. | |
func (fn ActionFunc) Execute(ctx context.Context) error { return fn(ctx) } |
type flow struct { | |
maxActions int64 | |
actions *semaphore.Weighted | |
calls *semaphore.Weighted | |
ex Executor | |
} | |
// ControlFlow decorates an Executor, limiting it to a maximum concurrent | |
// number of calls and actions. | |
func ControlFlow(e Executor, maxCalls, maxActions int64) Executor { | |
return &flow{ | |
maxActions: maxActions, | |
calls: semaphore.NewWeighted(maxCalls), | |
actions: semaphore.NewWeighted(maxActions), | |
ex: e, | |
} | |
} | |
// Execute attempts to acquire the semaphores for the concurrent calls and | |
// actions before delegating to the decorated Executor. If Execute is called | |
// with more actions than maxActions, an error is returned. | |
func (f *flow) Execute(ctx context.Context, actions []Action) error { | |
qty := int64(len(actions)) | |
if qty > f.maxActions { | |
return fmt.Errorf("maximum %d actions allowed", f.maxActions) | |
} | |
// limit concurrent calls to Executor.Execute | |
if err := f.calls.Acquire(ctx, 1); err != nil { | |
return err | |
} | |
defer f.calls.Release(1) | |
// limit total in-flight Actions, independent of Execute calls | |
if err := f.actions.Acquire(ctx, qty); err != nil { | |
return err | |
} | |
defer f.actions.Release(qty) | |
// delegate Actions to decorated Executor | |
return f.ex.Execute(ctx, actions) | |
} |
type metrics struct { | |
ex Executor | |
stats statCache | |
} | |
// Execute emits latency, success, and error metrics for every action delegated to the | |
// decorated Executor. For NamedActions, additional name-scoped stats are also emitted. | |
func (m *metrics) Execute(ctx context.Context, actions []Action) error { | |
wrapped := make([]Action, len(actions)) | |
global := m.stats.get("all_actions") | |
for i, a := range actions { | |
if na, ok := a.(NamedAction); ok { | |
// composed the NamedAction with global and name-scoped stats | |
wrapped[i] = namedStatAction{ | |
NamedAction: na, | |
global: global, | |
stats: m.stats.get(na.ID()), | |
} | |
} else { | |
// otherwise, just compose with global stats | |
wrapped[i] = statAction{ | |
Action: a, | |
global: global, | |
} | |
} | |
} | |
// delegate wrapped Actions to decorated Executor | |
return m.ex.Execute(ctx, wrapped) | |
} | |
type namedStatAction struct { | |
NamedAction | |
global *statSet | |
stats *statSet | |
} | |
func (a namedStatAction) Execute(ctx context.Context) error { | |
return captureMetrics(ctx, a.NamedAction, a.global, a.stats) | |
} | |
type statAction struct { | |
Action | |
global *statSet | |
} | |
func (a statAction) Execute(ctx context.Context) error { | |
return captureMetrics(ctx, a.Action, a.global, nil) | |
} | |
func captureMetrics(ctx context.Context, a Action, global, stats *statSet) error { | |
// execute the action, timing its latency | |
start := time.Now() | |
err := a.Execute(ctx) | |
lat := time.Now().Sub(start) | |
// create our counter values for error/success | |
var errored, succeeded int | |
if err != nil { | |
errored = 1 | |
} else { | |
succeeded = 1 | |
} | |
// emit the global stats | |
global.Latency(lat) | |
global.Success(succeeded) | |
global.Error(errored) | |
// if there are name-scoped stats, emit those, too | |
if stats != nil { | |
stats.Latency(lat) | |
stats.Success(succeeded) | |
stats.Error(errored) | |
} | |
return err | |
} |
// A NamedAction describes an Action that also has a unique identifier. This | |
// interface is used by the Debounce Executor to prevent duplicate actions from | |
// running concurrently. | |
type NamedAction interface { | |
Action | |
// ID returns the name for this Action. Identical actions | |
// should return the same ID value. | |
ID() string | |
} | |
type namedAction struct { | |
ActionFunc | |
name string | |
} | |
func (a namedAction) ID() string { return a.name } | |
// Named creates a NamedAction from fn, with n as its name. This function is | |
// just a helper to simplify creating NamedActions. | |
func Named(n string, fn ActionFunc) NamedAction { | |
return namedAction{ | |
ActionFunc: fn, | |
name: n, | |
} | |
} |
// Parallel is a concurrent implementation of Executor | |
type Parallel struct{} | |
// Execute performs all provided actions in concurrently, failing closed on the | |
// first error or if ctx is cancelled. | |
func (p Parallel) Execute(ctx context.Context, actions []Action) error { | |
grp, ctx := errgroup.WithContext(ctx) | |
for _, a := range actions { | |
grp.Go(p.execFn(ctx, a)) | |
} | |
return grp.Wait() | |
} | |
// execFn binds the Context and Action to the proper function signature for the | |
// errgroup.Group. | |
func (p Parallel) execFn(ctx context.Context, a Action) func() error { | |
return func() error { return a.Execute(ctx) } | |
} |
type pool struct { | |
done <-chan struct{} | |
in chan poolAction | |
} | |
// Pool creates an Executor backed by a concurrent worker pool. Up to n Actions | |
// can be in-flight simultaneously; if n is less than or equal to zero, | |
// runtime.NumCPU is used. The done channel should be closed to release | |
// resources held by the Executor. | |
func Pool(n int, done <-chan struct{}) Executor { | |
if n <= 0 { | |
n = runtime.NumCPU() | |
} | |
p := pool{done: done, in: make(chan poolAction, n)} | |
for i := 0; i < n; i++ { | |
go p.work(p.in, p.done) | |
} | |
return p | |
} | |
// Execute enqueues all Actions on the worker pool, failing closed on the | |
// first error or if ctx is cancelled. This method blocks until all enqueued | |
// Actions have returned. In the event of an error, not all Actions may be | |
// executed. | |
func (p pool) Execute(ctx context.Context, actions []Action) error { | |
qty := len(actions) | |
if qty == 0 { | |
return nil | |
} | |
ctx, cancel := context.WithCancel(ctx) | |
defer cancel() | |
res := make(chan error, qty) | |
var err error | |
var queued uint64 | |
enqueue: | |
for _, action := range actions { | |
pa := poolAction{ctx: ctx, act: action, res: res} | |
select { | |
case <-p.done: // pool is closed | |
cancel() | |
return errors.New("pool is closed") | |
case <-ctx.Done(): // ctx is closed by caller | |
err = ctx.Err() | |
break enqueue | |
case p.in <- pa: // enqueue action | |
queued++ | |
} | |
} | |
for ; queued > 0; queued-- { | |
if r := <-res; r != nil { | |
if err == nil { | |
err = r | |
cancel() | |
} | |
} | |
} | |
return err | |
} | |
func (p pool) work(in <-chan poolAction, done <-chan struct{}) { | |
for { | |
select { | |
case <-done: | |
return | |
case a := <-in: | |
a.res <- a.act.Execute(a.ctx) | |
} | |
} | |
} | |
type poolAction struct { | |
ctx context.Context | |
act Action | |
res chan<- error | |
} |
// Sequential implements Executor, performing each Action in series | |
type Sequential struct{} | |
// Execute performs each action in order, exiting on the first error or if the | |
// context is cancelled/deadlined. | |
func (Sequential) Execute(ctx context.Context, actions []Action) error { | |
for _, a := range actions { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
default: | |
if err := a.Execute(ctx); err != nil { | |
return err | |
} | |
} | |
} | |
return nil | |
} |
// StatSource creates metrics with the given name. The returned metrics should be | |
// concurrency-safe. | |
type StatSource interface { | |
Timer(name string) Timer | |
Counter(name string) Counter | |
} | |
// Timer emits the duration of a particular event. The duration value is | |
// typically used to measure latencies and create histograms thereof. | |
type Timer func(duration time.Duration) | |
// Counter emits any number of events happening at a given time. For example, | |
// Counters are often used to measure RPS. | |
type Counter func(delta int) | |
// A StatSet is the cached value. | |
type statSet struct { | |
// Latency measures how long an Action takes | |
Latency Timer | |
// Success is incremented when an Action does not return an error | |
Success Counter | |
// Error is incremented when an Action results in an error | |
Error Counter | |
} | |
// Cache describes a read-through cache to obtain | |
type statCache interface { | |
// get returns a shared statSet for the given name, either from the cache or | |
// a provided StatSource. | |
get(name string) *statSet | |
} |
// mutexCache implements statCache, backed by a map and sync.RWMutex | |
type mutexCache struct { | |
src StatSource | |
mtx sync.RWMutex | |
lookup map[string]*statSet | |
} | |
func (mc *mutexCache) get(name string) *statSet { | |
// take a read lock to see if the set already exists | |
mc.mtx.RLock() | |
set, ok := mc.lookup[name] | |
mc.mtx.RUnlock() | |
if ok { // the set exists, return it | |
return set | |
} | |
// need to take a write lock to update the map | |
mc.mtx.Lock() | |
// While waiting for the write lock, another goroutine may have created the | |
// set. Here, we check again after obtaining the lock before making a new one | |
if set, ok = mc.lookup[name]; !ok { | |
set = newStatSet(mc.src, name) | |
mc.lookup[name] = set | |
} | |
mc.mtx.Unlock() | |
return set | |
} |
// syncMapCache implements statCache, backed by a sync.Map | |
type syncMapCache struct { | |
src StatSource | |
lookup sync.Map | |
} | |
func (smc *syncMapCache) get(name string) *statSet { | |
val, _ := smc.lookup.Load(name) | |
if set, ok := val.(*statSet); ok { | |
return set | |
} | |
// create a new statSet, but don't store it if one was added since the last | |
// load. This is not ideal since we can't atomically create the set and | |
// write it. | |
set, _ := smc.lookup.LoadOrStore(name, newStatSet(smc.src, name)) | |
return set.(*statSet) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment