Skip to content

Instantly share code, notes, and snippets.

@skaji
Created July 2, 2019 09:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skaji/a948a8921d66fbeff3243410b8f5afdf to your computer and use it in GitHub Desktop.
Save skaji/a948a8921d66fbeff3243410b8f5afdf to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"time"
)
type event string
type message string
func newSource(ctx context.Context) <-chan event {
ch := make(chan event)
go func() {
defer close(ch)
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
ch <- event(fmt.Sprintf("%s", t))
}
}
}()
return ch
}
func process(eventChan <-chan event) <-chan message {
ch := make(chan message)
go func() {
defer close(ch)
for e := range eventChan {
ch <- message(e + ", from processor")
}
}()
return ch
}
func notify(messageChan <-chan message) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
for m := range messageChan {
fmt.Println(m + ", from notifier")
}
}()
return done
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
source := newSource(ctx)
processed := process(source)
done := notify(processed)
<-done
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment