Skip to content

Instantly share code, notes, and snippets.

@wtask
Last active July 2, 2019 21:56
Show Gist options
  • Save wtask/1485b1aed2679146de6950869061c5a9 to your computer and use it in GitHub Desktop.
Save wtask/1485b1aed2679146de6950869061c5a9 to your computer and use it in GitHub Desktop.
Abstraction for help to manage concurrency as scopes of dependencies
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Background - abstraction for concurrency scope
type Background struct {
ctx context.Context
ctxCancel context.CancelFunc
scope sync.WaitGroup
}
// Stop - stop trigger
type Stop func()
// NewBackground - concurrency scope builder
func NewBackground() (*Background, Stop) {
ctx, cancel := context.WithCancel(context.Background())
b := &Background{
ctx: ctx,
ctxCancel: cancel,
scope: sync.WaitGroup{},
}
return b,
Stop(func() {
b.ctxCancel()
b.scope.Wait()
})
}
// Context - return background context
func (b *Background) Context() context.Context {
return b.ctx
}
// Add - register "worker(s)" for scope
func (b *Background) Add(delta int) {
b.scope.Add(delta)
}
// Done - register single worker is done
func (b *Background) Done() {
b.scope.Done()
}
func writer(bg *Background, id string, data chan<- int) {
defer bg.Done()
// depend on write layer
for i := 0; ; i++ {
Loop:
for {
select {
case data <- i:
break Loop
case <-bg.Context().Done():
fmt.Printf("%q stopped on %d due to background context is done\n", id, i)
return
}
}
}
}
func reader(bg *Background, id string, data <-chan int) {
defer bg.Done()
for {
select {
case i, ok := <-data:
if !ok {
fmt.Printf("Reader %q stopped due to data channel was closed\n", id)
return
}
fmt.Printf("Reader %q received value: %d\n", id, i)
case <-bg.Context().Done():
fmt.Printf("Reader %q stopped due to background context is done\n", id)
return
}
}
}
func main() {
pipe, withoutSender := make(chan int), make(chan int)
writeBg, writeStop := NewBackground()
readBg, readStop := NewBackground()
readBg.Add(2)
go reader(readBg, "**READER*", pipe)
go reader(readBg, "**BLOCKED*READER**", withoutSender)
writeBg.Add(1)
go writer(writeBg, "**WRITER*", pipe)
time.Sleep(10 * time.Millisecond)
writeStop()
readStop()
// here now you safely close channels, if you want
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment