Skip to content

Instantly share code, notes, and snippets.

Last active December 23, 2015 01:39
Show Gist options
  • Save bruth/6561332 to your computer and use it in GitHub Desktop.
Save bruth/6561332 to your computer and use it in GitHub Desktop.
Go fan-in function
package main
import (
// fanIn takes zero or more channels and merges the received data to a
// single output channel. For efficiency, the output channel should be
// buffered to the number of inputs to prevent goroutines blocking each
// other.
func fanIn(inputs []chan []byte, output chan []byte, exit chan bool, timeout time.Duration) {
if len(inputs) == 0 {
log.Println("zero inputs")
defer log.Println("cleaning up fanIn")
// Always signal the exit
defer func() {
exit <- true
// Used to signal goroutines to exit
signal := make(chan struct{})
// Wait group for spawned routines used after exit is signaled
wg := sync.WaitGroup{}
// Spawn goroutines for each input channel
for i, input := range inputs {
log.Println("spawning input", i)
// Spawn go routine for each input
go func(input chan []byte, i int) {
defer log.Println("closing input", i)
defer wg.Done()
open := true
// for-select idiom to constantly receive off the input
// channel until it is closed on it has been signaled
// to exit
for open {
select {
case value, open := <-input:
// Input is closed, break
if !open {
log.Println("(closed) input", i)
output <- value
log.Printf("input %d -> %d\n", i, value)
case <-signal:
log.Println("(signaled) input", i)
open = false
open = false
}(input, i)
// The exit channel is expected to send a true value and wait
// until it receives a response, however if it is closed,
// immediately signal the goroutines.
if _, ok := <-exit; !ok {
log.Println("exit channel closed")
} else if timeout > 0 {
log.Println("timeout of", timeout, "started")
// Wait until all routines are done and exit
log.Println("waiting for goroutines to finish")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment