Skip to content

Instantly share code, notes, and snippets.

@thuan1412
Last active February 12, 2023 15:08
Show Gist options
  • Save thuan1412/8cfc34d5f109dc9d7d66d07598b15acf to your computer and use it in GitHub Desktop.
Save thuan1412/8cfc34d5f109dc9d7d66d07598b15acf to your computer and use it in GitHub Desktop.
Go Concurrency Pattern
package main
import "sync"
func main() {
}
func fanIn(
done <-chan interface{},
channels ...<-chan interface{},
) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})
multiplex := func(c <-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
package main
import (
"fmt"
"time"
)
func main() {
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
fmt.Println("waiting for done")
return
case intStream <- i:
}
}
}()
return intStream
}
multiply := func(
done <-chan interface{},
intStream <-chan int,
multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
fmt.Println("waiting for done mul")
return
case multipliedStream <- i * multiplier:
}
}
}()
return multipliedStream
}
add := func(
done <-chan interface{},
intStream <-chan int,
additive int,
) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
fmt.Println("waiting for done add")
return
case addedStream <- i + additive:
}
}
}()
return addedStream
}
done := make(chan interface{})
defer close(done)
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
for v := range pipeline {
fmt.Println(v)
}
time.Sleep(1 * time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment