Skip to content

Instantly share code, notes, and snippets.

@Jguer
Last active September 5, 2023 12:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Jguer/909447b7763fadee01f90775cdf57953 to your computer and use it in GitHub Desktop.
Save Jguer/909447b7763fadee01f90775cdf57953 to your computer and use it in GitHub Desktop.
Fan-Out with Context in Go
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"os"
"os/exec"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
)
type ErrProcessing struct {
event string
inner error
}
func (e *ErrProcessing) Error() string {
return fmt.Sprintf("error processing %s. extra context: %s", e.event, e.inner.Error())
}
func main() {
numOfWorkers := runtime.NumCPU()
fmt.Fprintln(os.Stdout, "Running on", numOfWorkers, "goroutines.")
var (
ctx, cancel = context.WithCancel(context.Background())
inChan = make(chan string) // Input values
outChanValues = make(chan string, 10) // Output values
outChanErrors = make(chan error, 10) // Output errors
succeeded = []string{} // Collected Output values
failed = []string{} // Collected Input values that failed
)
defer cancel()
go func() {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm
cancel()
}()
// Create consumers
wg := &sync.WaitGroup{} // Waitgroup for workers
wg.Add(numOfWorkers)
for s := 0; s < numOfWorkers; s++ {
go fanoutWorker(ctx, wg, inChan, s, outChanValues, outChanErrors)
}
// Input data
go inputData(inChan)
go func() {
// Once input data is treated and all workers have returned closed the output channel
wg.Wait()
close(outChanValues)
close(outChanErrors)
}()
for {
select {
case value, ok := <-outChanValues:
if ok {
fmt.Fprintf(os.Stdout, "Success: %s\n", value)
succeeded = append(succeeded, value)
} else {
outChanValues = nil
}
case err, ok := <-outChanErrors:
if ok {
var errP *ErrProcessing
if errors.As(err, &errP) {
failed = append(failed, errP.event)
} else {
fmt.Fprintln(os.Stderr, "unhandled error:", err)
}
} else {
outChanErrors = nil
}
}
if outChanValues == nil && outChanErrors == nil {
break
}
}
fmt.Fprintf(os.Stdout, "Successful (%d): %s\nFailed (%d): %s",
len(succeeded), strings.Join(succeeded, ", "), len(failed), strings.Join(failed, ", "))
}
// Insert data into the input channel and signal it's done
func inputData(inChan chan<- string) {
for _, v := range strings.Fields(Names) {
inChan <- v
}
close(inChan)
}
func fanoutWorker(ctx context.Context, wg *sync.WaitGroup, inChan <-chan string,
routineName int, valOut chan<- string, errOut chan<- error) {
defer wg.Done()
for {
select {
case <-ctx.Done(): // Signal Handling
return
case name, ok := <-inChan: // Process input
if !ok {
return
}
// Random sleep between 1-5s handled as syscalls.
cmd := exec.CommandContext(ctx, "sleep", fmt.Sprint(rand.Intn(5-1)+1))
if err := cmd.Run(); err == nil {
valOut <- name
} else {
errOut <- &ErrProcessing{event: name, inner: err}
}
}
}
}
const Names = "dynasty regret appalling creative accessories forlornness bazooka pattern first glow crackdown daughter addictive goon beautiful grave amusement pitch peepshow accountable bloat cyanide fork fight axiom biggest enjoy disfigurement teen foreign company lavender owl hooligan blabbermouth blockade frying melody empire apocalyptic hooves terror believable vibrator sentinel famous convulsion flirtation system heavyhearted"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment