Skip to content

Instantly share code, notes, and snippets.

@JensRantil
Last active August 18, 2019 01:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save JensRantil/cfe5efe5dccf95762d55 to your computer and use it in GitHub Desktop.
Save JensRantil/cfe5efe5dccf95762d55 to your computer and use it in GitHub Desktop.
Simplification of sample code at https://blog.gopheracademy.com/advent-2015/automi-stream-processing-over-go-channels/ Best practise is 1) to inject channels and 2) avoid concurrency in APIs.
func ingest(out <-chan []string) {
out <- []string{"aaaa", "bbb"}
out <- []string{"cccccc", "dddddd"}
out <- []string{"e", "fffff", "g"}
close(out)
}
func process(in <-chan []string, out <-chan int) {
for data := range in {
for _, word := range data {
out <- len(word)
}
}
}
func store(in <-chan int) {
for data := range in {
fmt.Println(data)
}
}
func main() {
concurrency := 4
// stage 1 ingest data from source
in := make(chan []string)
go ingest(in)
// stage 2 - process data
reduced := make(chan int)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
process(in, reduced)
wg.Done()
}()
}
go func() {
wg.Wait()
close(reduced)
}
// stage 3 - store
store(reduced)
}
@dndungu
Copy link

dndungu commented Jan 18, 2018

Good stuff! Thanks for sharing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment