Skip to content

Instantly share code, notes, and snippets.

@WKBae
Created March 29, 2020 12:37
Show Gist options
  • Save WKBae/b277c962616a6df8a06d7147419b1fd6 to your computer and use it in GitHub Desktop.
Save WKBae/b277c962616a6df8a06d7147419b1fd6 to your computer and use it in GitHub Desktop.
Unlimited buffered channel in Go
package main
import (
"fmt"
"sync"
"time"
)
func unlimitedBufferer(in <-chan int, out chan<- int) {
var buffer []int
// consume first item from `in`.
ConsumingLoop:
for item := range in {
buffer = append(buffer, item)
// read from `in` or write to `out`, whichever comes available first, until the buffer becomes empty.
for len(buffer) > 0 {
select {
case item, ok := <-in:
if !ok {
// if input channel is closed, exit comsuming loop
break ConsumingLoop
}
buffer = append(buffer, item)
case out <- buffer[0]:
buffer = buffer[1:]
}
}
}
// channel `in` is closed; flush out remaining items to `out`.
for _, item := range buffer {
out <- item
}
// finally close `out`.
close(out)
}
func main() {
inputCh := make(chan int)
outputCh := make(chan int)
go unlimitedBufferer(inputCh, outputCh)
wg := &sync.WaitGroup{}
wg.Add(1)
go fastProducer(wg, inputCh, 0, 10)
wg.Add(1)
go slowConsumer(wg, outputCh)
wg.Wait()
fmt.Println("Finished!")
}
func fastProducer(wg *sync.WaitGroup, ch chan<- int, start, end int) {
defer wg.Done()
for i := start; i < end; i++ {
ch <- i
}
close(ch)
fmt.Println("Producer completed!")
}
func slowConsumer(wg *sync.WaitGroup, ch <-chan int) {
defer wg.Done()
for item := range ch {
time.Sleep(100 * time.Millisecond)
fmt.Println(item)
}
fmt.Println("Consumer completed!")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment