Skip to content

Instantly share code, notes, and snippets.

@crhntr
Created December 11, 2022 21:41
Show Gist options
  • Save crhntr/295450826786f9aec79df871261bc1d8 to your computer and use it in GitHub Desktop.
Save crhntr/295450826786f9aec79df871261bc1d8 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"os"
"os/signal"
"sync"
)
func mapEachWithWorkers[In any, Out any](ctx context.Context, workers int, in <-chan In, out chan<- Out, fn func(context.Context, In) (Out, bool)) {
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for input := range in {
output, ok := fn(ctx, input)
if !ok {
continue
}
out <- output
}
}()
}
go func() {
wg.Wait()
close(out)
}()
}
func sendElementsAndCloseOnContextCancelOrNotifyInterrupt[T any](ctx context.Context, objects []T, in chan<- T, sigs ...os.Signal) {
exit := make(chan os.Signal, 1)
signal.Notify(exit, sigs...)
defer close(in)
for _, object := range objects {
select {
case <-ctx.Done():
return
case <-exit:
return
default:
in <- object
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment