Skip to content

Instantly share code, notes, and snippets.

@zeisss
Created June 11, 2015 16:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zeisss/a2bad0bdb9f2f921933d to your computer and use it in GitHub Desktop.
Save zeisss/a2bad0bdb9f2f921933d to your computer and use it in GitHub Desktop.
package main
import "time"
import "fmt"
import "math/rand"
// parallel performs the given actions with `n` actions in parallel.
// Make `n == len(actions)` to perform all actions in parallel.
//
// Arguments
// actions - The functions you want to execute. Error results will be reported to the errors channel.
// n - the number of parallel actions to execute in maximum. Must be positive.
// errors - Optional channel to report errors back to. Will be closed.
// cancel - Optional channel to signal abortion of processing.
func parallel(actions []func() error, n int, errors chan<- error, cancel <-chan struct{}) {
if len(actions) == 0 {
return
}
if n < 1 {
panic("Invalid n-value for rolling()")
}
// Fill a queue with n tokens to allow working
workerToken := make(chan struct{}, n)
workerReturn := make(chan struct{})
doneChan := make(chan struct{})
for i := 0; i < n; i++ {
workerToken <- struct{}{}
}
for _, action := range actions {
go func(action func() error, workerToken <-chan struct{}) {
defer func() {
doneChan <- struct{}{}
}()
token, stillOpen := <-workerToken
if !stillOpen {
// workerToken gets closed when the job was cancelled early
return
}
// Make sure the token goes back
defer func() {
workerReturn <- token
}()
err := action()
if err != nil && errors != nil {
errors <- err
}
}(action, workerToken)
}
canceled := false
done := 0
for {
select {
// The "client" wants to cancel processing
case <-cancel:
canceled = true
close(workerToken)
for range workerToken {
// drain it
}
// A job was processed and we get a success.
// Requeue the token as long as we are not canceled
case token := <-workerReturn:
if !canceled {
workerToken <- token
}
// A job was finished
case <-doneChan:
done++
}
if done == len(actions) {
break
}
}
close(doneChan)
close(workerReturn)
if errors != nil {
close(errors)
}
}
func main() {
// Build a list of 1000 tasks which take 100ms each
actions := []func() error{}
for i := 0; i < 1000; i++ {
func(i int) {
actions = append(actions, func() error {
fmt.Printf("Hello World! I am No %d\n", i)
time.Sleep(100 * time.Millisecond)
if rand.Intn(100) < 20 {
return fmt.Errorf("I died. I am No %d", i)
}
return nil
})
}(i)
}
errors := make(chan error)
cancel := make(chan struct{})
go parallel(actions, 20, errors, cancel)
for err := range errors {
fmt.Println(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment