Skip to content

Instantly share code, notes, and snippets.

@christianklotz
Forked from sgarcez/dedup_pipeline.go
Last active December 22, 2017 18:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save christianklotz/179d39640515920a68e212a7e5e7dc52 to your computer and use it in GitHub Desktop.
Save christianklotz/179d39640515920a68e212a7e5e7dc52 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"math/rand"
"time"
)
func generate(nums ...int) chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
r := rand.Intn(500)
time.Sleep(time.Duration(r) * time.Millisecond)
fmt.Println("Generator: Sending", n)
out <- n
}
}()
return out
}
func process(in chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
vals := make(map[int]bool)
for n := range in {
if _, ok := vals[n]; !ok {
vals[n] = true
fmt.Println("Sending:", n)
out <- n
continue
}
fmt.Println("Duplicate, dropping:", n)
}
}()
return out
}
func consume(in <-chan int) {
for n := range in {
fmt.Println("Consumer: Received", n)
r := rand.Intn(2000)
time.Sleep(time.Duration(r) * time.Millisecond)
fmt.Println("Consumer: Completed", n)
}
}
func main() {
rand.Seed(time.Now().UTC().UnixNano())
c := process(generate(2, 3, 4, 4, 4, 5, 5, 6))
consume(c)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment