Skip to content

Instantly share code, notes, and snippets.

@dmage
Created January 26, 2017 13:53
Show Gist options
  • Save dmage/536553584eb9d45a250e156af0e16457 to your computer and use it in GitHub Desktop.
Save dmage/536553584eb9d45a250e156af0e16457 to your computer and use it in GitHub Desktop.
// Package worker provides observable collections of runnable tasks.
package worker
import (
"context"
"sync"
)
// Interface defines a general worker.
type Interface interface {
// Run executes the worker.
Run(ctx context.Context) error
// Each iterates over workers. If the worker could be could be represented
// as several workers, the method should be called on each such worker.
// Otherwise the worker should be passed to the callback.
//
// If callback returns false, it considered as an abnormal situation and
// iteration should be aborted.
//
// Returns true if the callback has been successfully called for each
// worker.
Each(callback func(Interface) bool) bool
}
// Func type is an adapter to allow the use of ordinary functions as a worker.
type Func func(ctx context.Context) error
// Run implements Interface.Run by executing the function f.
func (f Func) Run(ctx context.Context) error {
return f(ctx)
}
// Each implements Interface.Each.
func (f Func) Each(visitor func(Interface) bool) bool {
return visitor(f)
}
// Sequential is a collection of workers that should be executed sequentially.
type Sequential []Interface
// Run executes tasks sequentially. If some worker returns an error, next
// workers will not be executed and the error will be returned.
func (s Sequential) Run(ctx context.Context) error {
for _, w := range s {
err := w.Run(ctx)
if err != nil {
return err
}
}
return nil
}
// Each implements Interface.Each.
func (s Sequential) Each(visitor func(Interface) bool) bool {
for _, w := range s {
if !w.Each(visitor) {
return false
}
}
return true
}
// Parallel is a collection of workers that should be executed concurrently.
type Parallel []Interface
// Run executes tasks concurrently. If some workers return an error, one of
// the catched errors will be returned.
func (p Parallel) Run(ctx context.Context) error {
wg := sync.WaitGroup{}
wg.Add(len(p))
mu := sync.Mutex{}
var firstError error
for _, w := range p {
go func(w Interface) {
defer wg.Done()
err := w.Run(ctx)
if err != nil {
mu.Lock()
defer mu.Unlock()
if firstError == nil {
firstError = err
}
}
}(w)
}
wg.Wait()
return firstError
}
// Each implements Interface.Each.
func (p Parallel) Each(visitor func(Interface) bool) bool {
for _, w := range p {
if !w.Each(visitor) {
return false
}
}
return true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment