Created
January 26, 2017 13:53
-
-
Save dmage/536553584eb9d45a250e156af0e16457 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package worker provides observable collections of runnable tasks. | |
package worker | |
import ( | |
"context" | |
"sync" | |
) | |
// Interface defines a general worker. | |
type Interface interface { | |
// Run executes the worker. | |
Run(ctx context.Context) error | |
// Each iterates over workers. If the worker could be could be represented | |
// as several workers, the method should be called on each such worker. | |
// Otherwise the worker should be passed to the callback. | |
// | |
// If callback returns false, it considered as an abnormal situation and | |
// iteration should be aborted. | |
// | |
// Returns true if the callback has been successfully called for each | |
// worker. | |
Each(callback func(Interface) bool) bool | |
} | |
// Func type is an adapter to allow the use of ordinary functions as a worker. | |
type Func func(ctx context.Context) error | |
// Run implements Interface.Run by executing the function f. | |
func (f Func) Run(ctx context.Context) error { | |
return f(ctx) | |
} | |
// Each implements Interface.Each. | |
func (f Func) Each(visitor func(Interface) bool) bool { | |
return visitor(f) | |
} | |
// Sequential is a collection of workers that should be executed sequentially. | |
type Sequential []Interface | |
// Run executes tasks sequentially. If some worker returns an error, next | |
// workers will not be executed and the error will be returned. | |
func (s Sequential) Run(ctx context.Context) error { | |
for _, w := range s { | |
err := w.Run(ctx) | |
if err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
// Each implements Interface.Each. | |
func (s Sequential) Each(visitor func(Interface) bool) bool { | |
for _, w := range s { | |
if !w.Each(visitor) { | |
return false | |
} | |
} | |
return true | |
} | |
// Parallel is a collection of workers that should be executed concurrently. | |
type Parallel []Interface | |
// Run executes tasks concurrently. If some workers return an error, one of | |
// the catched errors will be returned. | |
func (p Parallel) Run(ctx context.Context) error { | |
wg := sync.WaitGroup{} | |
wg.Add(len(p)) | |
mu := sync.Mutex{} | |
var firstError error | |
for _, w := range p { | |
go func(w Interface) { | |
defer wg.Done() | |
err := w.Run(ctx) | |
if err != nil { | |
mu.Lock() | |
defer mu.Unlock() | |
if firstError == nil { | |
firstError = err | |
} | |
} | |
}(w) | |
} | |
wg.Wait() | |
return firstError | |
} | |
// Each implements Interface.Each. | |
func (p Parallel) Each(visitor func(Interface) bool) bool { | |
for _, w := range p { | |
if !w.Each(visitor) { | |
return false | |
} | |
} | |
return true | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment