Skip to content

Instantly share code, notes, and snippets.

@gja
Last active December 15, 2016 16:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gja/da3c8ed6749505b147a571a91529a4b9 to your computer and use it in GitHub Desktop.
Save gja/da3c8ed6749505b147a571a91529a4b9 to your computer and use it in GitHub Desktop.
Program Showing Of Go Pipelining
package main
import (
"fmt"
"time"
"sync"
)
func producer(output_channel chan <- int) {
for i := 0; i < 100; i++ {
output_channel <- i
}
close(output_channel)
}
func process(input_channel <- chan int, output_channel chan <- int) {
for n := range input_channel {
time.Sleep(10000000)
output_channel <- n + 1
}
close(output_channel)
}
func consumer(input_channel <- chan int, output_channel chan <- bool) {
for n := range input_channel {
fmt.Println(n)
}
close(output_channel)
}
// ripped from https://blog.golang.org/pipelines
func merge(cs []chan int, size int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, size)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
producer_channel := make(chan int, 5)
go producer(producer_channel)
processor_channels := make([] chan int, 5)
for i:= 0; i < 5; i++ {
processor_channels[i] = make(chan int)
go process(producer_channel, processor_channels[i])
}
output_channel := merge(processor_channels, 5)
main_blocking_channel := make(chan bool)
go consumer(output_channel, main_blocking_channel)
// Only the final block needs an unnecessary blocking channel
<- main_blocking_channel
}
@gja
Copy link
Author

gja commented Dec 15, 2016

This program shows off the reason to close go channels. The main reason is to be able to run each block similar to a UNIX filter, as follows:

func process(input_channel <- chan int, output_channel chan <- int) {
  for n := range input_channel {     
      output_channel <- doSomething(n)
  }

  close(output_channel)
}

This relies on two steps (from https://blog.golang.org/pipelines)

  • stages close their outbound channels when all the send operations are done.
  • stages keep receiving values from inbound channels until those channels are closed.

Ergo, close all channels as you are done with them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment