Skip to content

Instantly share code, notes, and snippets.

@supershabam
Created March 4, 2015 18:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save supershabam/8405f99901b0d2e53ea8 to your computer and use it in GitHub Desktop.
Save supershabam/8405f99901b0d2e53ea8 to your computer and use it in GitHub Desktop.
//go:generate pipeliner map(func(string) (string, error) concurrently as concErrMap into conc_err_map.go
func concErrMap(concurrency int, fn func(string) (string, error), in <-chan string) (<-chan string, <-chan error) {
out := make(chan string)
errc := make(chan error, 1)
done := make(chan struct{})
once := sync.Once{}
go func() {
defer close(out)
var outerErr error
defer func() {
errc <- outerErr
}()
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for {
select {
case <-done:
return
case item, ok := <-in:
if !ok {
return // end of channel
}
t, err := fn(item)
if err != nil {
once.Do(func() {
outerErr = err
close(done)
})
return
}
out <- t
}
}
}()
}
wg.Wait()
}()
return out, errc
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment