Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example in Go of how to close a channel written by several goroutines
// Package gochannels example of how to close a channel written by several goroutines
package gochannels
import (
"math/big"
"sync"
)
// Publisher write sequences of big.Int into a channel
type Publisher struct {
ch chan big.Int
closingCh chan interface{}
writersWG sync.WaitGroup
writersWGMutex sync.Mutex
}
// New creates a Publisher
func New() *Publisher {
return &Publisher{
ch: make(chan big.Int),
closingCh: make(chan interface{}),
}
}
// Run write into the channel the sequence 0..n-1
func (p *Publisher) Run(n int) {
for i := 0; i < n; i++ {
p.write(*big.NewInt(int64(i)))
}
}
// Read returns the channel to write
func (p *Publisher) Read() <-chan big.Int {
return p.ch
}
// write into the channel in a different goroutine
func (p *Publisher) write(data big.Int) {
go func(data big.Int) {
p.writersWGMutex.Lock()
p.writersWG.Add(1)
p.writersWGMutex.Unlock()
defer p.writersWG.Done()
select {
case <-p.closingCh:
return
default:
}
select {
case <-p.closingCh:
case p.ch <- data:
}
}(data)
}
// Closes channel, draining any blocked writes
func (p *Publisher) Close() {
close(p.closingCh)
go func() {
for range p.ch {
}
}()
p.writersWGMutex.Lock()
p.writersWG.Wait()
p.writersWGMutex.Unlock()
close(p.ch)
}
// CloseWithoutDraining closes channel, without draining any pending writes, this method
// will block until all writes have been unblocked by reads
func (p *Publisher) CloseWithoutDraining() {
close(p.closingCh)
p.writersWGMutex.Lock()
p.writersWG.Wait()
p.writersWGMutex.Unlock()
close(p.ch)
}
package gochannels
// In order to detect race conditions run the test with:
// go test -cpu=1,9,55,99 -race -count=100 -failfast
import (
"math/big"
"sync"
"testing"
)
func TestSimple(t *testing.T) {
consumer := func(pub *Publisher, n int, wg *sync.WaitGroup, result chan *big.Int) {
ch := pub.Read()
acc := big.NewInt(0)
for i := 0; i < n; i++ {
val := <-ch
t.Log(&val)
acc.Add(acc, &val)
}
wg.Done()
result <- acc
}
producer := func(pub *Publisher, n int, wg *sync.WaitGroup) {
pub.Run(n)
wg.Done()
}
precalc := func(n int) *big.Int {
acc := big.NewInt(0)
for i := 0; i < n; i++ {
acc.Add(acc, big.NewInt(int64(i)))
}
return acc
}
p := New()
var wg sync.WaitGroup
resultCh := make(chan *big.Int)
wg.Add(2)
go consumer(p, 100, &wg, resultCh)
go producer(p, 100, &wg)
wg.Wait()
p.CloseWithoutDraining()
result := <-resultCh
t.Log(result)
t.Log(precalc(100))
if result.Cmp(precalc(100)) != 0 {
t.Error("wrong result")
}
}
func TestIntermediate(t *testing.T) {
consumer := func(pub *Publisher, n int, wg *sync.WaitGroup, result chan *big.Int) {
ch := pub.Read()
acc := big.NewInt(0)
for i := 0; i < n; i++ {
val := <-ch
t.Log(&val)
acc.Add(acc, &val)
}
wg.Done()
result <- acc
}
producer := func(pub *Publisher, n int, wg *sync.WaitGroup) {
pub.Run(n)
wg.Done()
}
p := New()
var wg sync.WaitGroup
resultCh := make(chan *big.Int)
wg.Add(3)
go consumer(p, 100, &wg, resultCh)
go producer(p, 100, &wg)
go producer(p, 100, &wg)
<-resultCh
p.Close()
wg.Wait()
}
@anirbanroydas
Copy link

anirbanroydas commented Jun 21, 2020

@leolara First of all, what an excellent post https://www.leolara.me/blog/closing_a_go_channel_written_by_several_goroutines/

I have one question though, in the func (p *Publisher) Close() method, what if we put the channel close inside the critical section instead?
Like

        p.writersWGMutex.Lock()
	p.writersWG.Wait()
        close(p.ch) // <----------------------------- HERE
	p.writersWGMutex.Unlock()

@leolara
Copy link
Author

leolara commented Jun 21, 2020

Hi @anirbanroydas

I wrote this a long time ago, so I do not remember all details now.

I think close is non-blocking, so I think it would not make a diference, but as I said it was a long time ago

@rueian
Copy link

rueian commented Aug 22, 2021

Hi @leolara,

I also come here from the https://www.leolara.me/blog/closing_a_go_channel_written_by_several_goroutines/. It is really a great article.

And since you mentioned the usage of a WaitGroup, I come up with an idea to avoid using mutex: we could probably use a writing counter to do similar work.

type Publisher struct {
	ch chan big.Int

	state   int32
	writing int32
}

func (p *Publisher) write(n int) {
	atomic.AddInt32(&p.writing, 1)
	if atomic.LoadInt32(&p.state) == 0 {
		p.ch <- n
	}
	atomic.AddInt32(&p.writing, -1)
}

func (p *Publisher) Close() {
	if atomic.CompareAndSwapInt32(&p.state, 0, 1) {
		go func() {
			for range p.ch {
			}
		}()
	}
	for atomic.LoadInt32(&p.writing) != 0 {
		runtime.Gosched()
	}
	if atomic.CompareAndSwapInt32(&p.state, 1, 2) {
		close(p.ch)
	}
}

@leolara
Copy link
Author

leolara commented Aug 23, 2021

Hi @rueian,

I guess you do atomic.CompareAndSwapInt32(&p.state, 1, 2) in case Close is called more than once?

@rueian
Copy link

rueian commented Aug 24, 2021

Hi @rueian,

I guess you do atomic.CompareAndSwapInt32(&p.state, 1, 2) in case Close is called more than once?

Yes, you are right. Otherwise it may panic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment