Skip to content

Instantly share code, notes, and snippets.

@cideM
Last active September 8, 2022 13:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cideM/aab27b385643193975dfe7c61766f923 to your computer and use it in GitHub Desktop.
Save cideM/aab27b385643193975dfe7c61766f923 to your computer and use it in GitHub Desktop.
Add error handling
diff --git a/main.go b/main.go
index 3888a36..8794caa 100644
--- a/main.go
+++ b/main.go
@@ -2,6 +2,7 @@ package main
import (
"context"
+ "errors"
"log"
"runtime"
"strings"
@@ -28,11 +29,13 @@ func producer(ctx context.Context, strings []string) (<-chan string, error) {
return outChannel, nil
}
-func transformToLower(ctx context.Context, values <-chan string) (<-chan string, error) {
+func transformToLower(ctx context.Context, values <-chan string) (<-chan string, <-chan error, error) {
outChannel := make(chan string)
+ errorChannel := make(chan error)
go func() {
defer close(outChannel)
+ defer close(errorChannel)
select {
case <-ctx.Done():
@@ -48,14 +51,16 @@ func transformToLower(ctx context.Context, values <-chan string) (<-chan string,
}
}()
- return outChannel, nil
+ return outChannel, errorChannel, nil
}
-func transformToTitle(ctx context.Context, values <-chan string) (<-chan string, error) {
+func transformToTitle(ctx context.Context, values <-chan string) (<-chan string, <-chan error, error) {
outChannel := make(chan string)
+ errorChannel := make(chan error)
go func() {
defer close(outChannel)
+ defer close(errorChannel)
select {
case <-ctx.Done():
@@ -64,22 +69,31 @@ func transformToTitle(ctx context.Context, values <-chan string) (<-chan string,
if ok {
log.Println("transformToTitle input: ", s)
time.Sleep(time.Second * 3)
- outChannel <- strings.Title(s)
+ if s == "foo" {
+ errorChannel <- errors.New("error in transformToTitle")
+ } else {
+ outChannel <- strings.ToTitle(s)
+ }
} else {
return
}
}
}()
- return outChannel, nil
+ return outChannel, errorChannel, nil
}
-func sink(ctx context.Context, values <-chan string) {
+func sink(ctx context.Context, cancel context.CancelFunc, values <-chan string, errors <-chan error) {
for {
select {
case <-ctx.Done():
log.Print(ctx.Err().Error())
return
+ case err, ok := <-errors:
+ if ok {
+ cancel()
+ log.Print(err.Error())
+ }
case val, ok := <-values:
if ok {
log.Println("Sink: ", val)
@@ -102,28 +116,32 @@ func main() {
}
stage1Channels := []<-chan string{}
+ errors := []<-chan error{}
for i := 0; i < runtime.NumCPU(); i++ {
- lowerCaseChannel, err := transformToLower(ctx, outputChannel)
+ lowerCaseChannel, lowerCaseErrors, err := transformToLower(ctx, outputChannel)
if err != nil {
log.Fatal(err)
}
stage1Channels = append(stage1Channels, lowerCaseChannel)
+ errors = append(errors, lowerCaseErrors)
}
stage1Merged := mergeStringChans(ctx, stage1Channels...)
stage2Channels := []<-chan string{}
for i := 0; i < runtime.NumCPU(); i++ {
- titleCaseChannel, err := transformToTitle(ctx, stage1Merged)
+ titleCaseChannel, titleCaseErrors, err := transformToTitle(ctx, stage1Merged)
if err != nil {
log.Fatal(err)
}
stage2Channels = append(stage2Channels, titleCaseChannel)
+ errors = append(errors, titleCaseErrors)
}
stage2Merged := mergeStringChans(ctx, stage2Channels...)
- sink(ctx, stage2Merged)
+ errorsMerged := mergeErrorChans(ctx, errors...)
+ sink(ctx, cancel, stage2Merged, errorsMerged)
}
func mergeStringChans(ctx context.Context, cs ...<-chan string) <-chan string {
@@ -153,3 +171,31 @@ func mergeStringChans(ctx context.Context, cs ...<-chan string) <-chan string {
return out
}
+
+func mergeErrorChans(ctx context.Context, cs ...<-chan error) <-chan error {
+ var wg sync.WaitGroup
+ out := make(chan error)
+
+ output := func(c <-chan error) {
+ defer wg.Done()
+ for n := range c {
+ select {
+ case out <- n:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+
+ return out
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment