Skip to content

Instantly share code, notes, and snippets.

@tcastelly
Forked from xeoncross/channels.go
Created June 17, 2020 20:40
Show Gist options
  • Save tcastelly/ed5a5d4ebaa7db4a3f2f672c8a2405d6 to your computer and use it in GitHub Desktop.
Save tcastelly/ed5a5d4ebaa7db4a3f2f672c8a2405d6 to your computer and use it in GitHub Desktop.
Fastest way to merge multiple channels in golang.
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// Fill Channel with int values
func fillChan(number int) <-chan int {
c := make(chan int)
go func() {
for i := 0; i < number; i++ {
c <- i
}
close(c)
}()
return c
}
// Create multiple channels and fill them
func createChannels(number, fill int) (chans []<-chan int) {
chans = make([]<-chan int, number)
for i := 0; i < number; i++ {
chans[i] = fillChan(fill)
}
return
}
/* Fail
func mergeTwo(a, b <-chan int) (c chan int) {
c = make(chan int)
go func() {
loop:
for {
select {
case c <- <-a:
//
case c <- <-b:
//
default:
break loop
}
}
close(c)
}()
return c
}
func mergeRec(chans ...<-chan int) <-chan int {
switch len(chans) {
case 0:
c := make(chan int)
close(c)
return c
case 1:
return chans[0]
default:
m := len(chans) / 2
return mergeTwo(
mergeRec(chans[:m]...),
mergeRec(chans[m:]...))
}
}
*/
func mergeWait(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(cs))
for _, c := range cs {
go func(c <-chan int) {
for v := range c {
out <- v
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func mergeAtomic(cs ...<-chan int) <-chan int {
out := make(chan int)
var i int32
atomic.StoreInt32(&i, int32(len(cs)))
for _, c := range cs {
go func(c <-chan int) {
for v := range c {
out <- v
}
if atomic.AddInt32(&i, -1) == 0 {
close(out)
}
}(c)
}
return out
}
func main() {
a := fillChan(2)
b := fillChan(2)
c := fillChan(2)
d := mergeWait(a, b, c)
for v := range d {
fmt.Println(v)
}
}
package main
import (
"fmt"
"testing"
)
func BenchmarkMerge(b *testing.B) {
merges := []struct {
name string
fun func(...<-chan int) <-chan int
}{
{"goroutines", mergeWait},
{"atomic", mergeAtomic},
// {"recursion", mergeRec},
}
for _, merge := range merges {
counter := 0
b.Run(merge.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
chans := createChannels(100, 100)
c := merge.fun(chans...)
counter = 0 // Reset each run
for range c {
counter++
}
}
})
fmt.Printf("%d results %s\n", counter, merge.name)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment