Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package main
import (
"fmt"
"math/rand"
"time"
)
func sortedChan(inputChans []<-chan int) <-chan int {
outChan := make(chan int)
go func(inputChans []<-chan int) {
count := len(inputChans)
nextValue := make([]int, count)
chanOpen := make([]bool, count)
openChanCount := count
// Populate buffer with next values from each channel
for i := 0; i < count; i++ {
nextValue[i], chanOpen[i] = <-inputChans[i]
if !chanOpen[i] {
openChanCount--
}
}
// Loop until some channel is still open
for openChanCount > 0 {
// Find smallest value from buffers
minI := -1
for i := 0; i < count; i++ {
if chanOpen[i] && (minI == -1 || nextValue[i] < nextValue[minI]) {
minI = i
}
}
// Send it
outChan <- nextValue[minI]
// Read new value from channel that had smallest value
nextValue[minI], chanOpen[minI] = <-inputChans[minI]
if !chanOpen[minI] {
openChanCount--
}
}
close(outChan)
}(inputChans)
return outChan
}
func main() {
// 10 input channels, each generates ten numbers from i*10 to i * 10 + 9 with random delays
chans := make([]<-chan int, 10)
for i := 0; i < 10; i++ {
ch := make(chan int)
chans[i] = ch
go func(base int, ch chan int) {
for j := 0; j < 10; j++ {
ch <- base*10 + j
time.Sleep(time.Duration(rand.Float64()) * time.Second)
}
close(ch)
}(i, ch)
}
// Out channel
outChan := sortedChan(chans)
// Receive and print contents of out channel
outValue, ok := <-outChan
for ok {
fmt.Println(outValue)
outValue, ok = <-outChan
}
}
@leonid-shevtsov

This comment has been minimized.

Copy link
Owner Author

commented Dec 25, 2017

go run sorted_streams.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.