Skip to content

Instantly share code, notes, and snippets.

@thinktainer
Last active March 7, 2018 13:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thinktainer/c608c4c9c81a4755205dece37bcac4b3 to your computer and use it in GitHub Desktop.
Save thinktainer/c608c4c9c81a4755205dece37bcac4b3 to your computer and use it in GitHub Desktop.
parallel processing loop (generic / interface{} based)
package s3repo
import (
"context"
"log"
"sync"
"github.com/pkg/errors"
)
func processItemsParallel(c context.Context, numGoRoutines int, items []interface{}, getItem func(context.Context, interface{}) (interface{}, error), handleItem func(context.Context, interface{}) error) error {
ctx, cancel := context.WithCancel(c)
errChan := make(chan error, 1)
resChan := make(chan interface{})
done := make(chan struct{})
tokenBucket := make(chan struct{}, numGoRoutines)
go func() {
for r := range resChan {
if err := handleItem(ctx, r); err != nil {
errChan <- errors.Wrap(err, "handling item")
break
}
}
done <- struct{}{}
}()
go func() {
wg := &sync.WaitGroup{}
LOOP:
for _, item := range items {
i := item
select {
case <-ctx.Done():
break LOOP
case tokenBucket <- struct{}{}:
wg.Add(1)
go func() {
defer wg.Done()
result, err := getItem(ctx, i)
if err != nil {
select {
case errChan <- errors.Wrap(err, "fetching item"):
default:
}
<-tokenBucket
return
}
resChan <- result
<-tokenBucket
}()
}
}
wg.Wait()
close(resChan)
}()
var err error
select {
case <-done:
case e := <-errChan:
log.Printf("error: %v", e)
err = e
}
cancel()
return err
}
@thinktainer
Copy link
Author

parallel processing with error handling (fail fast)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment