Last active
September 5, 2023 12:28
-
-
Save Jguer/909447b7763fadee01f90775cdf57953 to your computer and use it in GitHub Desktop.
Fan-Out with Context in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"math/rand" | |
"os" | |
"os/exec" | |
"os/signal" | |
"runtime" | |
"strings" | |
"sync" | |
"syscall" | |
) | |
type ErrProcessing struct { | |
event string | |
inner error | |
} | |
func (e *ErrProcessing) Error() string { | |
return fmt.Sprintf("error processing %s. extra context: %s", e.event, e.inner.Error()) | |
} | |
func main() { | |
numOfWorkers := runtime.NumCPU() | |
fmt.Fprintln(os.Stdout, "Running on", numOfWorkers, "goroutines.") | |
var ( | |
ctx, cancel = context.WithCancel(context.Background()) | |
inChan = make(chan string) // Input values | |
outChanValues = make(chan string, 10) // Output values | |
outChanErrors = make(chan error, 10) // Output errors | |
succeeded = []string{} // Collected Output values | |
failed = []string{} // Collected Input values that failed | |
) | |
defer cancel() | |
go func() { | |
sigterm := make(chan os.Signal, 1) | |
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) | |
<-sigterm | |
cancel() | |
}() | |
// Create consumers | |
wg := &sync.WaitGroup{} // Waitgroup for workers | |
wg.Add(numOfWorkers) | |
for s := 0; s < numOfWorkers; s++ { | |
go fanoutWorker(ctx, wg, inChan, s, outChanValues, outChanErrors) | |
} | |
// Input data | |
go inputData(inChan) | |
go func() { | |
// Once input data is treated and all workers have returned closed the output channel | |
wg.Wait() | |
close(outChanValues) | |
close(outChanErrors) | |
}() | |
for { | |
select { | |
case value, ok := <-outChanValues: | |
if ok { | |
fmt.Fprintf(os.Stdout, "Success: %s\n", value) | |
succeeded = append(succeeded, value) | |
} else { | |
outChanValues = nil | |
} | |
case err, ok := <-outChanErrors: | |
if ok { | |
var errP *ErrProcessing | |
if errors.As(err, &errP) { | |
failed = append(failed, errP.event) | |
} else { | |
fmt.Fprintln(os.Stderr, "unhandled error:", err) | |
} | |
} else { | |
outChanErrors = nil | |
} | |
} | |
if outChanValues == nil && outChanErrors == nil { | |
break | |
} | |
} | |
fmt.Fprintf(os.Stdout, "Successful (%d): %s\nFailed (%d): %s", | |
len(succeeded), strings.Join(succeeded, ", "), len(failed), strings.Join(failed, ", ")) | |
} | |
// Insert data into the input channel and signal it's done | |
func inputData(inChan chan<- string) { | |
for _, v := range strings.Fields(Names) { | |
inChan <- v | |
} | |
close(inChan) | |
} | |
func fanoutWorker(ctx context.Context, wg *sync.WaitGroup, inChan <-chan string, | |
routineName int, valOut chan<- string, errOut chan<- error) { | |
defer wg.Done() | |
for { | |
select { | |
case <-ctx.Done(): // Signal Handling | |
return | |
case name, ok := <-inChan: // Process input | |
if !ok { | |
return | |
} | |
// Random sleep between 1-5s handled as syscalls. | |
cmd := exec.CommandContext(ctx, "sleep", fmt.Sprint(rand.Intn(5-1)+1)) | |
if err := cmd.Run(); err == nil { | |
valOut <- name | |
} else { | |
errOut <- &ErrProcessing{event: name, inner: err} | |
} | |
} | |
} | |
} | |
const Names = "dynasty regret appalling creative accessories forlornness bazooka pattern first glow crackdown daughter addictive goon beautiful grave amusement pitch peepshow accountable bloat cyanide fork fight axiom biggest enjoy disfigurement teen foreign company lavender owl hooligan blabbermouth blockade frying melody empire apocalyptic hooves terror believable vibrator sentinel famous convulsion flirtation system heavyhearted" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment