Created
June 11, 2015 16:51
-
-
Save zeisss/a2bad0bdb9f2f921933d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import "time" | |
import "fmt" | |
import "math/rand" | |
// parallel performs the given actions with `n` actions in parallel. | |
// Make `n == len(actions)` to perform all actions in parallel. | |
// | |
// Arguments | |
// actions - The functions you want to execute. Error results will be reported to the errors channel. | |
// n - the number of parallel actions to execute in maximum. Must be positive. | |
// errors - Optional channel to report errors back to. Will be closed. | |
// cancel - Optional channel to signal abortion of processing. | |
func parallel(actions []func() error, n int, errors chan<- error, cancel <-chan struct{}) { | |
if len(actions) == 0 { | |
return | |
} | |
if n < 1 { | |
panic("Invalid n-value for rolling()") | |
} | |
// Fill a queue with n tokens to allow working | |
workerToken := make(chan struct{}, n) | |
workerReturn := make(chan struct{}) | |
doneChan := make(chan struct{}) | |
for i := 0; i < n; i++ { | |
workerToken <- struct{}{} | |
} | |
for _, action := range actions { | |
go func(action func() error, workerToken <-chan struct{}) { | |
defer func() { | |
doneChan <- struct{}{} | |
}() | |
token, stillOpen := <-workerToken | |
if !stillOpen { | |
// workerToken gets closed when the job was cancelled early | |
return | |
} | |
// Make sure the token goes back | |
defer func() { | |
workerReturn <- token | |
}() | |
err := action() | |
if err != nil && errors != nil { | |
errors <- err | |
} | |
}(action, workerToken) | |
} | |
canceled := false | |
done := 0 | |
for { | |
select { | |
// The "client" wants to cancel processing | |
case <-cancel: | |
canceled = true | |
close(workerToken) | |
for range workerToken { | |
// drain it | |
} | |
// A job was processed and we get a success. | |
// Requeue the token as long as we are not canceled | |
case token := <-workerReturn: | |
if !canceled { | |
workerToken <- token | |
} | |
// A job was finished | |
case <-doneChan: | |
done++ | |
} | |
if done == len(actions) { | |
break | |
} | |
} | |
close(doneChan) | |
close(workerReturn) | |
if errors != nil { | |
close(errors) | |
} | |
} | |
func main() { | |
// Build a list of 1000 tasks which take 100ms each | |
actions := []func() error{} | |
for i := 0; i < 1000; i++ { | |
func(i int) { | |
actions = append(actions, func() error { | |
fmt.Printf("Hello World! I am No %d\n", i) | |
time.Sleep(100 * time.Millisecond) | |
if rand.Intn(100) < 20 { | |
return fmt.Errorf("I died. I am No %d", i) | |
} | |
return nil | |
}) | |
}(i) | |
} | |
errors := make(chan error) | |
cancel := make(chan struct{}) | |
go parallel(actions, 20, errors, cancel) | |
for err := range errors { | |
fmt.Println(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment