Skip to content

Instantly share code, notes, and snippets.

@Yapcheekian
Created May 22, 2021 13:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Yapcheekian/31cba364a4236f48f4ab173e73de27bf to your computer and use it in GitHub Desktop.
Save Yapcheekian/31cba364a4236f48f4ab173e73de27bf to your computer and use it in GitHub Desktop.
How to merge N channels into single one
package main
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
)
func main() {
a := asChan(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
b := asChan(11, 12, 13, 14, 15, 16, 17, 18)
c := asChan(21, 22, 23, 24, 25, 26, 27, 28, 29)
for v := range mergeRelect(a, b, c) {
fmt.Println(v)
}
}
func asChan(v ...int) <-chan int {
c := make(chan int)
go func() {
for _, v := range v {
c <- v
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
close(c)
}()
return c
}
func merge(chans ...<-chan int) <-chan int {
out := make(chan int)
go func() {
var wg sync.WaitGroup
wg.Add(len(chans))
for _, c := range chans {
go func(c <-chan int) {
for v := range c {
out <- v
}
wg.Done()
}(c)
}
wg.Wait()
close(out)
}()
return out
}
func mergeRelect(chans ...<-chan int) <-chan int {
out := make(chan int)
var cases []reflect.SelectCase
for _, c := range chans {
defer close(out)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok {
cases = append(cases[:i], cases[i+1:]...)
continue
}
out <- v.Interface().(int)
}
return out
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment