Skip to content

Instantly share code, notes, and snippets.

@bruth
Last active December 23, 2015 01:39
Show Gist options
  • Save bruth/6561332 to your computer and use it in GitHub Desktop.
Save bruth/6561332 to your computer and use it in GitHub Desktop.
Go fan-in function
package main
import (
"log"
"sync"
"time"
)
// fanIn takes zero or more channels and merges the received data to a
// single output channel. For efficiency, the output channel should be
// buffered to the number of inputs to prevent goroutines blocking each
// other.
func fanIn(inputs []chan []byte, output chan []byte, exit chan bool, timeout time.Duration) {
if len(inputs) == 0 {
log.Println("zero inputs")
return
}
defer log.Println("cleaning up fanIn")
// Always signal the exit
defer func() {
exit <- true
}()
// Used to signal goroutines to exit
signal := make(chan struct{})
// Wait group for spawned routines used after exit is signaled
wg := sync.WaitGroup{}
wg.Add(len(inputs))
// Spawn goroutines for each input channel
for i, input := range inputs {
log.Println("spawning input", i)
// Spawn go routine for each input
go func(input chan []byte, i int) {
defer log.Println("closing input", i)
defer wg.Done()
open := true
// for-select idiom to constantly receive off the input
// channel until it is closed on it has been signaled
// to exit
for open {
select {
case value, open := <-input:
// Input is closed, break
if !open {
log.Println("(closed) input", i)
break
}
output <- value
log.Printf("input %d -> %d\n", i, value)
case <-signal:
log.Println("(signaled) input", i)
open = false
default:
open = false
}
}
}(input, i)
}
// The exit channel is expected to send a true value and wait
// until it receives a response, however if it is closed,
// immediately signal the goroutines.
if _, ok := <-exit; !ok {
log.Println("exit channel closed")
close(signal)
} else if timeout > 0 {
log.Println("timeout of", timeout, "started")
<-time.After(timeout)
close(signal)
}
// Wait until all routines are done and exit
log.Println("waiting for goroutines to finish")
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment