Skip to content

Instantly share code, notes, and snippets.

@arschles
Last active August 19, 2023 10:10
Show Gist options
  • Save arschles/ee10bd14ad62a9b5b2efc051eab36b38 to your computer and use it in GitHub Desktop.
Save arschles/ee10bd14ad62a9b5b2efc051eab36b38 to your computer and use it in GitHub Desktop.
Fan-out and Fan-in with Go
package fans
// this code was inspired by https://talks.golang.org/2012/concurrency.slide#36
//
// it uses the context package (which wasn't available when that presentation was made) instead of a timer to
// do timeouts
import (
"context"
"time"
"sync"
)
func fn1(i int) int {
// probably I/O will happen here
return i + 1
}
func fn2(i int) int {
// probably I/O will happen here
return i + 2
}
func run() []int {
ctx := context.Background()
// return to the caller within 500 milliseconds, regardless of how long the functions take
ctx, done := context.WithTimeout(ctx, 500*time.Millisecond)
// when this function returns, stop the context. if you do this, <-ctx.Done() will be
// closed. that's important above if fn1 or fn2 receive on <-ctx.Done()
defer done()
// this is the channel that all functions will fan-in to
ch := make(chan int)
// this wait group is so that we can get notified if the functions all are done _before_
// the context times out
var wg sync.WaitGroup
/////
// fan out
/////
wg.Add(1)
go func() {
defer wg.Done()
// either
select {
case ch <- fn1(123):
// fn1 returned before the context timed out
case <-ctx.Done():
// the context timed out before fn1 returned. if you want to keep going
// after the context times out, remove this case statement
return
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case ch <- fn2(456):
// fn2 returned before the context timed out
case <- ctx.Done():
// the context timed out before fn2 returned. if you want to keep going
// after the context times out, remove this case statement
return
}
}()
// wait asynchronously for all the functions to be done and close the channel when they are
//
// this only matters if the functions return before the ctx times out
go func() {
wg.Wait()
close(ch)
}()
/////
// fan in
/////
ret := []int{}
for i := range ch {
select {
case i, ok <- ch:
if !ok {
// this means the channel is closed, so all the functions returned before
// the timeout. the i variable will be empty, but ret has everything we need so we
// can return it
return ret
}
ret = append(ret, i)
case <-ctx.Done():
// time's up, break out of the loop
//
// instead of breaking here, you can start up a background goroutine
// to wait for the rest of the results and put them into a queue or something
break
}
}
return ret
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment